From: Peter Amstutz Date: Thu, 15 May 2014 20:42:33 +0000 (-0400) Subject: 2798: Added AuthorizedGet(), Ask() and AuthorizedAsk(). Added BLOCKSIZE X-Git-Tag: 1.1.0~2603^2~8^2~11 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/c3a88cbf511aa0954dac271ce6bda9c6e4f3191c 2798: Added AuthorizedGet(), Ask() and AuthorizedAsk(). Added BLOCKSIZE constant and moved errors.New() declarations to the top of the file. Improved test server runner. Changed some test methods to take copies of KeepClient instead of pointer. --- diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go index aeb805b7a0..2738cefa7c 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient.go @@ -15,13 +15,19 @@ import ( "strconv" ) +// A Keep "block" is 64MB. +const BLOCKSIZE = 64 * 1024 * 1024 + +var BlockNotFound = errors.New("Block not found") +var InsufficientReplicasError = errors.New("Could not write sufficient replicas") + type KeepClient struct { ApiServer string ApiToken string ApiInsecure bool Service_roots []string Want_replicas int - client *http.Client + Client *http.Client } type KeepDisk struct { @@ -30,25 +36,24 @@ type KeepDisk struct { SSL bool `json:"service_ssl_flag"` } -func MakeKeepClient() (kc *KeepClient, err error) { - kc = &KeepClient{ - ApiServer: os.Getenv("ARVADOS_API_HOST"), - ApiToken: os.Getenv("ARVADOS_API_TOKEN"), - ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""), - Want_replicas: 2} - +func MakeKeepClient() (kc KeepClient, err error) { tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: kc.ApiInsecure}, } - kc.client = &http.Client{Transport: tr} + kc = KeepClient{ + ApiServer: os.Getenv("ARVADOS_API_HOST"), + ApiToken: os.Getenv("ARVADOS_API_TOKEN"), + ApiInsecure: (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""), + Want_replicas: 2, + Client: &http.Client{Transport: tr}} - err = kc.DiscoverKeepDisks() + err = (&kc).DiscoverKeepServers() return kc, err } -func (this *KeepClient) DiscoverKeepDisks() error { +func (this *KeepClient) DiscoverKeepServers() error { // Construct request of keep disk list var req *http.Request var err error @@ -61,7 +66,7 @@ func (this *KeepClient) DiscoverKeepDisks() error { // Make the request var resp *http.Response - if resp, err = this.client.Do(req); err != nil { + if resp, err = this.Client.Do(req); err != nil { return err } @@ -415,7 +420,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read req.Body = body var resp *http.Response - if resp, err = this.client.Do(req); err != nil { + if resp, err = this.Client.Do(req); err != nil { upload_status <- UploadStatus{err, url, 0} return } @@ -427,8 +432,6 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read } } -var InsufficientReplicasError = errors.New("Could not write sufficient replicas") - func (this KeepClient) putReplicas( hash string, requests chan ReadRequest, @@ -496,12 +499,12 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re // Buffer for reads from 'r' var buffer []byte if expectedLength > 0 { - if expectedLength > 64*1024*1024 { + if expectedLength > BLOCKSIZE { return 0, OversizeBlockError } buffer = make([]byte, expectedLength) } else { - buffer = make([]byte, 64*1024*1024) + buffer = make([]byte, BLOCKSIZE) } // Read requests on Transfer() buffer @@ -543,18 +546,29 @@ func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) } } -var BlockNotFound = errors.New("Block not found") - func (this KeepClient) Get(hash string) (reader io.ReadCloser, contentLength int64, url string, err error) { + return this.AuthorizedGet(hash, "", "") +} - // Calculate the ordering for uploading to servers +func (this KeepClient) AuthorizedGet(hash string, + signature string, + timestamp string) (reader io.ReadCloser, + contentLength int64, url string, err error) { + + // Calculate the ordering for asking servers sv := this.ShuffledServiceRoots(hash) for _, host := range sv { var req *http.Request var err error - var url = fmt.Sprintf("%s/%s", host, hash) + var url string + if signature != "" { + url = fmt.Sprintf("%s/%s+A%s@%s", host, hash, + signature, timestamp) + } else { + url = fmt.Sprintf("%s/%s", host, hash) + } if req, err = http.NewRequest("GET", url, nil); err != nil { continue } @@ -562,7 +576,7 @@ func (this KeepClient) Get(hash string) (reader io.ReadCloser, req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) var resp *http.Response - if resp, err = this.client.Do(req); err != nil { + if resp, err = this.Client.Do(req); err != nil { continue } @@ -573,3 +587,42 @@ func (this KeepClient) Get(hash string) (reader io.ReadCloser, return nil, 0, "", BlockNotFound } + +func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) { + return this.AuthorizedAsk(hash, "", "") +} + +func (this KeepClient) AuthorizedAsk(hash string, signature string, + timestamp string) (contentLength int64, url string, err error) { + // Calculate the ordering for asking servers + sv := this.ShuffledServiceRoots(hash) + + for _, host := range sv { + var req *http.Request + var err error + if signature != "" { + url = fmt.Sprintf("%s/%s+A%s@%s", host, hash, + signature, timestamp) + } else { + url = fmt.Sprintf("%s/%s", host, hash) + } + + if req, err = http.NewRequest("HEAD", url, nil); err != nil { + continue + } + + req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken)) + + var resp *http.Response + if resp, err = this.Client.Do(req); err != nil { + continue + } + + if resp.StatusCode == http.StatusOK { + return resp.ContentLength, url, nil + } + } + + return 0, "", BlockNotFound + +} diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go index 00a2063cb6..3d38c60007 100644 --- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go +++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go @@ -13,6 +13,7 @@ import ( "os" "os/exec" "sort" + "strings" "testing" "time" ) @@ -32,18 +33,23 @@ type ServerRequiredSuite struct{} // Standalone tests type StandaloneSuite struct{} +func pythonDir() string { + gopath := os.Getenv("GOPATH") + return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0]) +} + func (s *ServerRequiredSuite) SetUpSuite(c *C) { if *no_server { c.Skip("Skipping tests that require server") } else { - os.Chdir(os.ExpandEnv("$GOPATH../python")) + os.Chdir(pythonDir()) exec.Command("python", "run_test_server.py", "start").Run() exec.Command("python", "run_test_server.py", "start_keep").Run() } } func (s *ServerRequiredSuite) TearDownSuite(c *C) { - os.Chdir(os.ExpandEnv("$GOPATH../python")) + os.Chdir(pythonDir()) exec.Command("python", "run_test_server.py", "stop_keep").Run() exec.Command("python", "run_test_server.py", "stop").Run() } @@ -464,7 +470,7 @@ func RunBogusKeepServer(st http.Handler, port int) (listener net.Listener, url s return listener, url } -func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string, +func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string, io.ReadCloser, io.WriteCloser, chan UploadStatus)) { listener, url := RunBogusKeepServer(st, 2990) @@ -488,7 +494,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) { make(chan string)} UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, + func(kc KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadStatus) { go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo"))) @@ -511,7 +517,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) { make(chan string)} UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, + func(kc KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadStatus) { // Buffer for reads from 'r' @@ -559,7 +565,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) { hash := "acbd18db4cc2f85cedef654fccc4a4d8" UploadToStubHelper(c, st, - func(kc *KeepClient, url string, reader io.ReadCloser, + func(kc KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan UploadStatus) { go kc.uploadToKeepServer(url, hash, reader, upload_status, 3) @@ -869,7 +875,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) { c.Check(content, DeepEquals, []byte("foo")) } -func (s *ServerRequiredSuite) TestPutAndGet(c *C) { +func (s *ServerRequiredSuite) TestPutGetHead(c *C) { os.Setenv("ARVADOS_API_HOST", "localhost:3001") os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h") os.Setenv("ARVADOS_API_HOST_INSECURE", "true") @@ -882,12 +888,21 @@ func (s *ServerRequiredSuite) TestPutAndGet(c *C) { c.Check(replicas, Equals, 2) c.Check(err, Equals, nil) - r, n, url2, err := kc.Get(hash) - c.Check(err, Equals, nil) - c.Check(n, Equals, int64(3)) - c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash)) + { + r, n, url2, err := kc.Get(hash) + c.Check(err, Equals, nil) + c.Check(n, Equals, int64(3)) + c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash)) - content, err2 := ioutil.ReadAll(r) - c.Check(err2, Equals, nil) - c.Check(content, DeepEquals, []byte("foo")) + content, err2 := ioutil.ReadAll(r) + c.Check(err2, Equals, nil) + c.Check(content, DeepEquals, []byte("foo")) + } + + { + n, url2, err := kc.Ask(hash) + c.Check(err, Equals, nil) + c.Check(n, Equals, int64(3)) + c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash)) + } }