2798: Adds client side support for Keep proxy X-Keep-Desired-Replicas and
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 21 May 2014 20:00:05 +0000 (16:00 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 21 May 2014 20:00:05 +0000 (16:00 -0400)
X-Keep-Replicas-Stored headers, and ARVADOS_KEEP_PROXY environment variable.
Added tests.

sdk/go/src/arvados.org/keepclient/keepclient.go
sdk/go/src/arvados.org/keepclient/keepclient_test.go
sdk/go/src/arvados.org/keepclient/support.go

index dc3ceedf454281d6d88f3469dadab2fe8b56f573..e16c853e4685cd36e8442301f96b1832e4bdd566 100644 (file)
@@ -28,6 +28,7 @@ type KeepClient struct {
        Service_roots []string
        Want_replicas int
        Client        *http.Client
+       Using_proxy   bool
 }
 
 // Create a new KeepClient, initialized with standard Arvados environment
@@ -35,7 +36,7 @@ type KeepClient struct {
 // ARVADOS_API_HOST_INSECURE.  This will contact the API server to discover
 // Keep servers.
 func MakeKeepClient() (kc KeepClient, err error) {
-       insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") != "")
+       insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") == "true")
 
        kc = KeepClient{
                ApiServer:     os.Getenv("ARVADOS_API_HOST"),
@@ -43,8 +44,8 @@ func MakeKeepClient() (kc KeepClient, err error) {
                ApiInsecure:   insecure,
                Want_replicas: 2,
                Client: &http.Client{Transport: &http.Transport{
-                       TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure},
-               }}}
+                       TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
+               Using_proxy: false}
 
        err = (&kc).discoverKeepServers()
 
index 1d3bbeee308761be39b189ecfac044f6d009ff97..1ef5fd62de81f2b322eb58d842cc88f07fb59091 100644 (file)
@@ -161,7 +161,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 
                        <-st.handled
                        status := <-upload_status
-                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
+                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
                })
 
        log.Printf("TestUploadToStubKeepServer done")
@@ -194,7 +194,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
                        <-st.handled
 
                        status := <-upload_status
-                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200})
+                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
                })
 
        log.Printf("TestUploadToStubKeepServerBufferReader done")
@@ -229,8 +229,8 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
                        <-st.handled
 
                        status := <-upload_status
-                       c.Check(status.Url, Equals, fmt.Sprintf("%s/%s", url, hash))
-                       c.Check(status.StatusCode, Equals, 500)
+                       c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
+                       c.Check(status.statusCode, Equals, 500)
                })
        log.Printf("TestFailedUploadToStubKeepServer done")
 }
@@ -610,3 +610,68 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
                c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
        }
 }
+
+type StubProxyHandler struct {
+       handled chan string
+}
+
+func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       resp.Header().Set("X-Keep-Replicas-Stored", "2")
+       this.handled <- fmt.Sprintf("http://%s", req.Host)
+}
+
+func (s *StandaloneSuite) TestPutProxy(c *C) {
+       log.Printf("TestPutProxy")
+
+       st := StubProxyHandler{make(chan string, 1)}
+
+       kc, _ := MakeKeepClient()
+
+       kc.Want_replicas = 2
+       kc.Using_proxy = true
+       kc.ApiToken = "abc123"
+       kc.Service_roots = make([]string, 1)
+
+       ks1 := RunSomeFakeKeepServers(st, 1, 2990)
+
+       for i, k := range ks1 {
+               kc.Service_roots[i] = k.url
+               defer k.listener.Close()
+       }
+
+       _, replicas, err := kc.PutB([]byte("foo"))
+       <-st.handled
+
+       c.Check(err, Equals, nil)
+       c.Check(replicas, Equals, 2)
+
+       log.Printf("TestPutProxy done")
+}
+
+func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
+       log.Printf("TestPutProxy")
+
+       st := StubProxyHandler{make(chan string, 1)}
+
+       kc, _ := MakeKeepClient()
+
+       kc.Want_replicas = 3
+       kc.Using_proxy = true
+       kc.ApiToken = "abc123"
+       kc.Service_roots = make([]string, 1)
+
+       ks1 := RunSomeFakeKeepServers(st, 1, 2990)
+
+       for i, k := range ks1 {
+               kc.Service_roots[i] = k.url
+               defer k.listener.Close()
+       }
+
+       _, replicas, err := kc.PutB([]byte("foo"))
+       <-st.handled
+
+       c.Check(err, Equals, InsufficientReplicasError)
+       c.Check(replicas, Equals, 2)
+
+       log.Printf("TestPutProxy done")
+}
index e657a60a39e46d668253436b61c7a191504188cf..d0ea9670fe22f819471d947808ec22c9b1e132d5 100644 (file)
@@ -9,6 +9,7 @@ import (
        "io"
        "log"
        "net/http"
+       "os"
        "sort"
        "strconv"
 )
@@ -17,13 +18,22 @@ type keepDisk struct {
        Hostname string `json:"service_host"`
        Port     int    `json:"service_port"`
        SSL      bool   `json:"service_ssl_flag"`
+       SvcType  string `json:"service_type"`
 }
 
 func (this *KeepClient) discoverKeepServers() error {
+       if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
+               this.Service_roots = make([]string, 1)
+               this.Service_roots[0] = prx
+               this.Using_proxy = true
+               return nil
+       }
+
        // Construct request of keep disk list
        var req *http.Request
        var err error
-       if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
+
+       if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_services/accessible", this.ApiServer), nil); err != nil {
                return err
        }
 
@@ -36,6 +46,17 @@ func (this *KeepClient) discoverKeepServers() error {
                return err
        }
 
+       if resp.StatusCode != 200 {
+               // fall back on keep disks
+               if req, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_disks", this.ApiServer), nil); err != nil {
+                       return err
+               }
+               req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+               if resp, err = this.Client.Do(req); err != nil {
+                       return err
+               }
+       }
+
        type svcList struct {
                Items []keepDisk `json:"items"`
        }
@@ -52,6 +73,7 @@ func (this *KeepClient) discoverKeepServers() error {
 
        for _, element := range m.Items {
                n := ""
+
                if element.SSL {
                        n = "s"
                }
@@ -64,6 +86,9 @@ func (this *KeepClient) discoverKeepServers() error {
                        listed[url] = true
                        this.Service_roots = append(this.Service_roots, url)
                }
+               if element.SvcType == "proxy" {
+                       this.Using_proxy = true
+               }
        }
 
        // Must be sorted for ShuffledServiceRoots() to produce consistent
@@ -122,9 +147,10 @@ func (this KeepClient) shuffledServiceRoots(hash string) (pseq []string) {
 }
 
 type uploadStatus struct {
-       Err        error
-       Url        string
-       StatusCode int
+       err             error
+       url             string
+       statusCode      int
+       replicas_stored int
 }
 
 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
@@ -136,7 +162,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
        var err error
        var url = fmt.Sprintf("%s/%s", host, hash)
        if req, err = http.NewRequest("PUT", url, nil); err != nil {
-               upload_status <- uploadStatus{err, url, 0}
+               upload_status <- uploadStatus{err, url, 0, 0}
                body.Close()
                return
        }
@@ -147,18 +173,28 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 
        req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
        req.Header.Add("Content-Type", "application/octet-stream")
+
+       if this.Using_proxy {
+               req.Header.Add("X-Keep-Desired-Replicas", fmt.Sprint(this.Want_replicas))
+       }
+
        req.Body = body
 
        var resp *http.Response
        if resp, err = this.Client.Do(req); err != nil {
-               upload_status <- uploadStatus{err, url, 0}
+               upload_status <- uploadStatus{err, url, 0, 0}
                return
        }
 
+       rep := 1
+       if xr := resp.Header.Get("X-Keep-Replicas-Stored"); xr != "" {
+               fmt.Sscanf(xr, "%d", &rep)
+       }
+
        if resp.StatusCode == http.StatusOK {
-               upload_status <- uploadStatus{nil, url, resp.StatusCode}
+               upload_status <- uploadStatus{nil, url, resp.StatusCode, rep}
        } else {
-               upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode}
+               upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep}
        }
 }
 
@@ -181,6 +217,7 @@ func (this KeepClient) putReplicas(
        defer close(upload_status)
 
        // Desired number of replicas
+
        remaining_replicas := this.Want_replicas
 
        for remaining_replicas > 0 {
@@ -191,23 +228,28 @@ func (this KeepClient) putReplicas(
                                next_server += 1
                                active += 1
                        } else {
-                               return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+                               fmt.Print(active)
+                               if active == 0 {
+                                       return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+                               } else {
+                                       break
+                               }
                        }
                }
 
                // Now wait for something to happen.
                status := <-upload_status
-               if status.StatusCode == 200 {
+               if status.statusCode == 200 {
                        // good news!
-                       remaining_replicas -= 1
+                       remaining_replicas -= status.replicas_stored
                } else {
                        // writing to keep server failed for some reason
                        log.Printf("Keep server put to %v failed with '%v'",
-                               status.Url, status.Err)
+                               status.url, status.err)
                }
                active -= 1
-               log.Printf("Upload status %v %v %v", status.StatusCode, remaining_replicas, active)
+               log.Printf("Upload status %v %v %v", status.statusCode, remaining_replicas, active)
        }
 
-       return (this.Want_replicas - remaining_replicas), nil
+       return this.Want_replicas, nil
 }