2798: Added AuthorizedGet(), Ask() and AuthorizedAsk(). Added BLOCKSIZE
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 15 May 2014 20:42:33 +0000 (16:42 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 15 May 2014 20:42:33 +0000 (16:42 -0400)
constant and moved errors.New() declarations to the top of the file.  Improved
test server runner.  Changed some test methods to take copies of KeepClient
instead of pointer.

sdk/go/src/arvados.org/keepclient/keepclient.go
sdk/go/src/arvados.org/keepclient/keepclient_test.go

index aeb805b7a0db9c6d1f3b2af97599612c61acf4a7..2738cefa7c3144ec613aa8b6f8d6922e33debc44 100644 (file)
@@ -15,13 +15,19 @@ import (
        "strconv"
 )
 
+// A Keep "block" is 64MB.
+const BLOCKSIZE = 64 * 1024 * 1024
+
+var BlockNotFound = errors.New("Block not found")
+var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
+
 type KeepClient struct {
        ApiServer     string
        ApiToken      string
        ApiInsecure   bool
        Service_roots []string
        Want_replicas int
-       client        *http.Client
+       Client        *http.Client
 }
 
 type KeepDisk struct {
@@ -30,25 +36,24 @@ type KeepDisk struct {
        SSL      bool   `json:"service_ssl_flag"`
 }
 
-func MakeKeepClient() (kc *KeepClient, err error) {
-       kc = &KeepClient{
-               ApiServer:     os.Getenv("ARVADOS_API_HOST"),
-               ApiToken:      os.Getenv("ARVADOS_API_TOKEN"),
-               ApiInsecure:   (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""),
-               Want_replicas: 2}
-
+func MakeKeepClient() (kc KeepClient, err error) {
        tr := &http.Transport{
                TLSClientConfig: &tls.Config{InsecureSkipVerify: kc.ApiInsecure},
        }
 
-       kc.client = &http.Client{Transport: tr}
+       kc = KeepClient{
+               ApiServer:     os.Getenv("ARVADOS_API_HOST"),
+               ApiToken:      os.Getenv("ARVADOS_API_TOKEN"),
+               ApiInsecure:   (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""),
+               Want_replicas: 2,
+               Client:        &http.Client{Transport: tr}}
 
-       err = kc.DiscoverKeepDisks()
+       err = (&kc).DiscoverKeepServers()
 
        return kc, err
 }
 
-func (this *KeepClient) DiscoverKeepDisks() error {
+func (this *KeepClient) DiscoverKeepServers() error {
        // Construct request of keep disk list
        var req *http.Request
        var err error
@@ -61,7 +66,7 @@ func (this *KeepClient) DiscoverKeepDisks() error {
 
        // Make the request
        var resp *http.Response
-       if resp, err = this.client.Do(req); err != nil {
+       if resp, err = this.Client.Do(req); err != nil {
                return err
        }
 
@@ -415,7 +420,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
        req.Body = body
 
        var resp *http.Response
-       if resp, err = this.client.Do(req); err != nil {
+       if resp, err = this.Client.Do(req); err != nil {
                upload_status <- UploadStatus{err, url, 0}
                return
        }
@@ -427,8 +432,6 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
        }
 }
 
-var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
-
 func (this KeepClient) putReplicas(
        hash string,
        requests chan ReadRequest,
@@ -496,12 +499,12 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
        // Buffer for reads from 'r'
        var buffer []byte
        if expectedLength > 0 {
-               if expectedLength > 64*1024*1024 {
+               if expectedLength > BLOCKSIZE {
                        return 0, OversizeBlockError
                }
                buffer = make([]byte, expectedLength)
        } else {
-               buffer = make([]byte, 64*1024*1024)
+               buffer = make([]byte, BLOCKSIZE)
        }
 
        // Read requests on Transfer() buffer
@@ -543,18 +546,29 @@ func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error)
        }
 }
 
-var BlockNotFound = errors.New("Block not found")
-
 func (this KeepClient) Get(hash string) (reader io.ReadCloser,
        contentLength int64, url string, err error) {
+       return this.AuthorizedGet(hash, "", "")
+}
 
-       // Calculate the ordering for uploading to servers
+func (this KeepClient) AuthorizedGet(hash string,
+       signature string,
+       timestamp string) (reader io.ReadCloser,
+       contentLength int64, url string, err error) {
+
+       // Calculate the ordering for asking servers
        sv := this.ShuffledServiceRoots(hash)
 
        for _, host := range sv {
                var req *http.Request
                var err error
-               var url = fmt.Sprintf("%s/%s", host, hash)
+               var url string
+               if signature != "" {
+                       url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
+                               signature, timestamp)
+               } else {
+                       url = fmt.Sprintf("%s/%s", host, hash)
+               }
                if req, err = http.NewRequest("GET", url, nil); err != nil {
                        continue
                }
@@ -562,7 +576,7 @@ func (this KeepClient) Get(hash string) (reader io.ReadCloser,
                req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
 
                var resp *http.Response
-               if resp, err = this.client.Do(req); err != nil {
+               if resp, err = this.Client.Do(req); err != nil {
                        continue
                }
 
@@ -573,3 +587,42 @@ func (this KeepClient) Get(hash string) (reader io.ReadCloser,
 
        return nil, 0, "", BlockNotFound
 }
+
+func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
+       return this.AuthorizedAsk(hash, "", "")
+}
+
+func (this KeepClient) AuthorizedAsk(hash string, signature string,
+       timestamp string) (contentLength int64, url string, err error) {
+       // Calculate the ordering for asking servers
+       sv := this.ShuffledServiceRoots(hash)
+
+       for _, host := range sv {
+               var req *http.Request
+               var err error
+               if signature != "" {
+                       url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
+                               signature, timestamp)
+               } else {
+                       url = fmt.Sprintf("%s/%s", host, hash)
+               }
+
+               if req, err = http.NewRequest("HEAD", url, nil); err != nil {
+                       continue
+               }
+
+               req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+
+               var resp *http.Response
+               if resp, err = this.Client.Do(req); err != nil {
+                       continue
+               }
+
+               if resp.StatusCode == http.StatusOK {
+                       return resp.ContentLength, url, nil
+               }
+       }
+
+       return 0, "", BlockNotFound
+
+}
index 00a2063cb6b46094dc42a3456382c2c7c17ae35c..3d38c60007e9c5192771a85eb84adfd8fa846c1e 100644 (file)
@@ -13,6 +13,7 @@ import (
        "os"
        "os/exec"
        "sort"
+       "strings"
        "testing"
        "time"
 )
@@ -32,18 +33,23 @@ type ServerRequiredSuite struct{}
 // Standalone tests
 type StandaloneSuite struct{}
 
+func pythonDir() string {
+       gopath := os.Getenv("GOPATH")
+       return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
+}
+
 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
        if *no_server {
                c.Skip("Skipping tests that require server")
        } else {
-               os.Chdir(os.ExpandEnv("$GOPATH../python"))
+               os.Chdir(pythonDir())
                exec.Command("python", "run_test_server.py", "start").Run()
                exec.Command("python", "run_test_server.py", "start_keep").Run()
        }
 }
 
 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
-       os.Chdir(os.ExpandEnv("$GOPATH../python"))
+       os.Chdir(pythonDir())
        exec.Command("python", "run_test_server.py", "stop_keep").Run()
        exec.Command("python", "run_test_server.py", "stop").Run()
 }
@@ -464,7 +470,7 @@ func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url s
        return listener, url
 }
 
-func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
+func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
        io.ReadCloser, io.WriteCloser, chan UploadStatus)) {
 
        listener, url := RunBogusKeepServer(st, 2990)
@@ -488,7 +494,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
                make(chan string)}
 
        UploadToStubHelper(c, st,
-               func(kc *KeepClient, url string, reader io.ReadCloser,
+               func(kc KeepClient, url string, reader io.ReadCloser,
                        writer io.WriteCloser, upload_status chan UploadStatus) {
 
                        go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
@@ -511,7 +517,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
                make(chan string)}
 
        UploadToStubHelper(c, st,
-               func(kc *KeepClient, url string, reader io.ReadCloser,
+               func(kc KeepClient, url string, reader io.ReadCloser,
                        writer io.WriteCloser, upload_status chan UploadStatus) {
 
                        // Buffer for reads from 'r'
@@ -559,7 +565,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
        hash := "acbd18db4cc2f85cedef654fccc4a4d8"
 
        UploadToStubHelper(c, st,
-               func(kc *KeepClient, url string, reader io.ReadCloser,
+               func(kc KeepClient, url string, reader io.ReadCloser,
                        writer io.WriteCloser, upload_status chan UploadStatus) {
 
                        go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
@@ -869,7 +875,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
        c.Check(content, DeepEquals, []byte("foo"))
 }
 
-func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
+func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
        os.Setenv("ARVADOS_API_HOST", "localhost:3001")
        os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
        os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
@@ -882,12 +888,21 @@ func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
        c.Check(replicas, Equals, 2)
        c.Check(err, Equals, nil)
 
-       r, n, url2, err := kc.Get(hash)
-       c.Check(err, Equals, nil)
-       c.Check(n, Equals, int64(3))
-       c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
+       {
+               r, n, url2, err := kc.Get(hash)
+               c.Check(err, Equals, nil)
+               c.Check(n, Equals, int64(3))
+               c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
 
-       content, err2 := ioutil.ReadAll(r)
-       c.Check(err2, Equals, nil)
-       c.Check(content, DeepEquals, []byte("foo"))
+               content, err2 := ioutil.ReadAll(r)
+               c.Check(err2, Equals, nil)
+               c.Check(content, DeepEquals, []byte("foo"))
+       }
+
+       {
+               n, url2, err := kc.Ask(hash)
+               c.Check(err, Equals, nil)
+               c.Check(n, Equals, int64(3))
+               c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
+       }
 }