From: Peter Amstutz Date: Wed, 21 May 2014 20:00:05 +0000 (-0400) Subject: 2798: Adds client side support for Keep proxy X-Keep-Desired-Replicas and X-Git-Tag: 1.1.0~2603^2~8^2~1 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/d54a48fa6fe94e9b80bf32c1d357e4dc3b3d67c3?ds=sidebyside 2798: Adds client side support for Keep proxy X-Keep-Desired-Replicas and X-Keep-Replicas-Stored headers, and ARVADOS_KEEP_PROXY environment variable. Added tests. --- diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go index dc3ceedf45..e16c853e46 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient.go @@ -28,6 +28,7 @@ type KeepClient struct { Service_roots []string Want_replicas int Client *http.Client + Using_proxy bool } // Create a new KeepClient, initialized with standard Arvados environment @@ -35,7 +36,7 @@ type KeepClient struct { // ARVADOS_API_HOST_INSECURE. This will contact the API server to discover // Keep servers. func MakeKeepClient() (kc KeepClient, err error) { - insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") != "") + insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") == "true") kc = KeepClient{ ApiServer: os.Getenv("ARVADOS_API_HOST"), @@ -43,8 +44,8 @@ func MakeKeepClient() (kc KeepClient, err error) { ApiInsecure: insecure, Want_replicas: 2, Client: &http.Client{Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}, - }}} + TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}}, + Using_proxy: false} err = (&kc).discoverKeepServers() diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go index 1d3bbeee30..1ef5fd62de 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go @@ -161,7 +161,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { <-st.handled status := <-upload_status - c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1}) }) log.Printf("TestUploadToStubKeepServer done") @@ -194,7 +194,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { <-st.handled status := <-upload_status - c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200}) + c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1}) }) log.Printf("TestUploadToStubKeepServerBufferReader done") @@ -229,8 +229,8 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { <-st.handled status := <-upload_status - c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash)) - c.Check(status.StatusCode, Equals, 500) + c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash)) + c.Check(status.statusCode, Equals, 500) }) log.Printf("TestFailedUploadToStubKeepServer done") } @@ -610,3 +610,68 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash)) } } + +type StubProxyHandler struct { + handled chan string +} + +func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + resp.Header().Set("X-Keep-Replicas-Stored", "2") + this.handled <- fmt.Sprintf("http://%s", req.Host) +} + +func (s *StandaloneSuite) TestPutProxy(c *C) { + log.Printf("TestPutProxy") + + st := StubProxyHandler{make(chan string, 1)} + + kc, _ := MakeKeepClient() + + kc.Want_replicas = 2 + kc.Using_proxy = true + kc.ApiToken = "abc123" + kc.Service_roots = make([]string, 1) + + ks1 := RunSomeFakeKeepServers(st, 1, 2990) + + for i, k := range ks1 { + kc.Service_roots[i] = k.url + defer k.listener.Close() + } + + _, replicas, err := kc.PutB([]byte("foo")) + <-st.handled + + c.Check(err, Equals, nil) + c.Check(replicas, Equals, 2) + + log.Printf("TestPutProxy done") +} + +func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) { + log.Printf("TestPutProxy") + + st := StubProxyHandler{make(chan string, 1)} + + kc, _ := MakeKeepClient() + + kc.Want_replicas = 3 + kc.Using_proxy = true + kc.ApiToken = "abc123" + kc.Service_roots = make([]string, 1) + + ks1 := RunSomeFakeKeepServers(st, 1, 2990) + + for i, k := range ks1 { + kc.Service_roots[i] = k.url + defer k.listener.Close() + } + + _, replicas, err := kc.PutB([]byte("foo")) + <-st.handled + + c.Check(err, Equals, InsufficientReplicasError) + c.Check(replicas, Equals, 2) + + log.Printf("TestPutProxy done") +} diff --git a/sdk/go/src/arvados.org/keepclient/support.go b/sdk/go/src/arvados.org/keepclient/support.go index e657a60a39..d0ea9670fe 100644 --- a/sdk/go/src/arvados.org/keepclient/support.go +++ b/sdk/go/src/arvados.org/keepclient/support.go @@ -9,6 +9,7 @@ import ( "io" "log" "net/http" + "os" "sort" "strconv" ) @@ -17,13 +18,22 @@ type keepDisk struct { Hostname string `json:"service_host"` Port int `json:"service_port"` SSL bool `json:"service_ssl_flag"` + SvcType string `json:"service_type"` } func (this *KeepClient) discoverKeepServers() error { + if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" { + this.Service_roots = make([]string, 1) + this.Service_roots[0] = prx + this.Using_proxy = true + return nil + } + // Construct request of keep disk list var req *http.Request var err error - if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil { + + if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_services/accessible", this.ApiServer), nil); err != nil { return err } @@ -36,6 +46,17 @@ func (this *KeepClient) discoverKeepServers() error { return err } + if resp.StatusCode != 200 { + // fall back on keep disks + if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil { + return err + } + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) + if resp, err = this.Client.Do(req); err != nil { + return err + } + } + type svcList struct { Items []keepDisk `json:"items"` } @@ -52,6 +73,7 @@ func (this *KeepClient) discoverKeepServers() error { for _, element := range m.Items { n := "" + if element.SSL { n = "s" } @@ -64,6 +86,9 @@ func (this *KeepClient) discoverKeepServers() error { listed[url] = true this.Service_roots = append(this.Service_roots, url) } + if element.SvcType == "proxy" { + this.Using_proxy = true + } } // Must be sorted for ShuffledServiceRoots() to produce consistent @@ -122,9 +147,10 @@ func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) { } type uploadStatus struct { - Err error - Url string - StatusCode int + err error + url string + statusCode int + replicas_stored int } func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser, @@ -136,7 +162,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read var err error var url = fmt.Sprintf("%s/%s", host, hash) if req, err = http.NewRequest("PUT", url, nil); err != nil { - upload_status <- uploadStatus{err, url, 0} + upload_status <- uploadStatus{err, url, 0, 0} body.Close() return } @@ -147,18 +173,28 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) req.Header.Add("Content-Type", "application/octet-stream") + + if this.Using_proxy { + req.Header.Add("X-Keep-Desired-Replicas", fmt.Sprint(this.Want_replicas)) + } + req.Body = body var resp *http.Response if resp, err = this.Client.Do(req); err != nil { - upload_status <- uploadStatus{err, url, 0} + upload_status <- uploadStatus{err, url, 0, 0} return } + rep := 1 + if xr := resp.Header.Get("X-Keep-Replicas-Stored"); xr != "" { + fmt.Sscanf(xr, "%d", &rep) + } + if resp.StatusCode == http.StatusOK { - upload_status <- uploadStatus{nil, url, resp.StatusCode} + upload_status <- uploadStatus{nil, url, resp.StatusCode, rep} } else { - upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode} + upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep} } } @@ -181,6 +217,7 @@ func (this KeepClient) putReplicas( defer close(upload_status) // Desired number of replicas + remaining_replicas := this.Want_replicas for remaining_replicas > 0 { @@ -191,23 +228,28 @@ func (this KeepClient) putReplicas( next_server += 1 active += 1 } else { - return (this.Want_replicas - remaining_replicas), InsufficientReplicasError + fmt.Print(active) + if active == 0 { + return (this.Want_replicas - remaining_replicas), InsufficientReplicasError + } else { + break + } } } // Now wait for something to happen. status := <-upload_status - if status.StatusCode == 200 { + if status.statusCode == 200 { // good news! - remaining_replicas -= 1 + remaining_replicas -= status.replicas_stored } else { // writing to keep server failed for some reason log.Printf("Keep server put to %v failed with '%v'", - status.Url, status.Err) + status.url, status.err) } active -= 1 - log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active) + log.Printf("Upload status %v %v %v", status.statusCode, remaining_replicas, active) } - return (this.Want_replicas - remaining_replicas), nil + return this.Want_replicas, nil }