7200: add GetIndex function to keepclient; add IndexHandler to keepproxy.
authorradhika <radhika@curoverse.com>
Mon, 28 Sep 2015 14:10:48 +0000 (10:10 -0400)
committerradhika <radhika@curoverse.com>
Mon, 28 Sep 2015 14:10:48 +0000 (10:10 -0400)
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go

index 70aa3746bcc117932beba6313ed1b3d9a0ac0a3d..a7522eb2e6b4a2b4deec5bd019cd9658fb9b9e4b 100644 (file)
@@ -28,6 +28,9 @@ var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.It
 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
 var InvalidLocatorError = errors.New("Invalid locator")
+var NoSuchKeepServer = errors.New("No keep server matching the given UUID is found")
+var GetIndexError = errors.New("Error getting index")
+var IncompleteIndexError = errors.New("Got incomplete index")
 
 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
@@ -182,6 +185,55 @@ func (kc *KeepClient) Ask(locator string) (int64, string, error) {
        return 0, "", BlockNotFound
 }
 
+// GetIndex retrieves a list of blocks stored on the given server whose hashes
+// begin with the given prefix. The returned reader will return an error (other
+// than EOF) if the complete index cannot be retrieved. This should only be
+// expected to return useful results if the client is using a "data manager token"
+// recognized by the Keep services.
+func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error) {
+       url := kc.LocalRoots()[keepServiceUUID]
+       if url == "" {
+               log.Printf("No such keep server found: %v", keepServiceUUID)
+               return nil, NoSuchKeepServer
+       }
+
+       url += "/index"
+       if prefix != "" {
+               url += "/" + prefix
+       }
+
+       req, err := http.NewRequest("GET", url, nil)
+       if err != nil {
+               log.Printf("GET index error: %v", err)
+               return nil, GetIndexError
+       }
+
+       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+       resp, err := kc.Client.Do(req)
+       if err != nil || resp.StatusCode != http.StatusOK {
+               log.Printf("GET index error: %v; status code: %v", err, resp.StatusCode)
+               return nil, GetIndexError
+       }
+
+       var respbody []byte
+       if resp.Body != nil {
+               respbody, err = ioutil.ReadAll(resp.Body)
+               if err != nil {
+                       log.Printf("GET index error: %v", err)
+                       return nil, GetIndexError
+               }
+
+               // Got index; verify that it is complete
+               if !strings.HasSuffix(string(respbody), "\n\n") {
+                       log.Printf("Got incomplete index for %v", url)
+                       return nil, IncompleteIndexError
+               }
+       }
+
+       // Got complete index or "" if no locators matching prefix
+       return strings.NewReader(string(respbody)), nil
+}
+
 // LocalRoots() returns the map of local (i.e., disk and proxy) Keep
 // services: uuid -> baseURI.
 func (kc *KeepClient) LocalRoots() map[string]string {
index e4e459e83a51c408141c7127a098f69e955a005e..f83bee14a7cc551b29e86432ef8c847d7da04147 100644 (file)
@@ -948,3 +948,135 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
        c.Check(err, Equals, InsufficientReplicasError)
        c.Check(replicas, Equals, 0)
 }
+
+type StubGetIndexHandler struct {
+       c              *C
+       expectPath     string
+       expectApiToken string
+       httpStatus     int
+       body           []byte
+}
+
+func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       h.c.Check(req.URL.Path, Equals, h.expectPath)
+       h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectApiToken))
+       resp.WriteHeader(h.httpStatus)
+       resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
+       resp.Write(h.body)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := StubGetIndexHandler{
+               c,
+               "/index",
+               "abc123",
+               http.StatusOK,
+               []byte(string(hash) + "\n\n")}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+       r, err := kc.GetIndex("x", "")
+       c.Check(err, Equals, nil)
+
+       content, err2 := ioutil.ReadAll(r)
+       c.Check(err2, Equals, nil)
+       c.Check(content, DeepEquals, st.body)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := StubGetIndexHandler{
+               c,
+               "/index/" + hash[0:3],
+               "abc123",
+               http.StatusOK,
+               []byte(string(hash) + "\n\n")}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+       r, err := kc.GetIndex("x", hash[0:3])
+       c.Check(err, Equals, nil)
+
+       content, err2 := ioutil.ReadAll(r)
+       c.Check(err2, Equals, nil)
+       c.Check(content, DeepEquals, st.body)
+}
+
+func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := StubGetIndexHandler{
+               c,
+               "/index/" + hash[0:3],
+               "abc123",
+               http.StatusOK,
+               []byte(string(hash))}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+       _, err = kc.GetIndex("x", hash[0:3])
+       c.Check(err, Equals, IncompleteIndexError)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+       st := StubGetIndexHandler{
+               c,
+               "/index/" + hash[0:3],
+               "abc123",
+               http.StatusOK,
+               []byte(string(hash))}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+       _, err = kc.GetIndex("y", hash[0:3])
+       c.Check(err, Equals, NoSuchKeepServer)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
+       st := StubGetIndexHandler{
+               c,
+               "/index/xyz",
+               "abc123",
+               http.StatusOK,
+               []byte("")}
+
+       ks := RunFakeKeepServer(st)
+       defer ks.listener.Close()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+       arv.ApiToken = "abc123"
+       kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+       _, err = kc.GetIndex("x", "xyz")
+       c.Check((err != nil), Equals, true)
+}
index d0af4a58ea5e7746d8243fb6272820e8c4801307..788cbfcdcf17ac018c9a445602d8e853f8b3b6fd 100644 (file)
@@ -16,6 +16,7 @@ import (
        "os/signal"
        "reflect"
        "regexp"
+       "strings"
        "sync"
        "syscall"
        "time"
@@ -241,6 +242,11 @@ type PutBlockHandler struct {
        *ApiTokenCache
 }
 
+type IndexHandler struct {
+       *keepclient.KeepClient
+       *ApiTokenCache
+}
+
 type InvalidPathHandler struct{}
 
 type OptionsHandler struct{}
@@ -262,6 +268,12 @@ func MakeRESTRouter(
                rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
                        GetBlockHandler{kc, t}).Methods("GET", "HEAD")
                rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+
+               // List all blocks
+               rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
+
+               // List blocks whose hash has the given prefix
+               rest.Handle(`/index/{prefix}`, IndexHandler{kc, t}).Methods("GET")
        }
 
        if enable_put {
@@ -481,3 +493,70 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                status = http.StatusBadGateway
        }
 }
+
+func (this IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       SetCorsHeaders(resp)
+
+       prefix := mux.Vars(req)["prefix"]
+       var err error
+       var status int
+
+       defer func() {
+               if status != http.StatusOK {
+                       http.Error(resp, err.Error(), status)
+               }
+       }()
+
+       kc := *this.KeepClient
+
+       var pass bool
+       var tok string
+       if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
+               status, err = http.StatusForbidden, BadAuthorizationHeader
+               return
+       }
+
+       // Copy ArvadosClient struct and use the client's API token
+       arvclient := *kc.Arvados
+       arvclient.ApiToken = tok
+       kc.Arvados = &arvclient
+
+       var indexResp []byte
+       var reader io.Reader
+
+       switch req.Method {
+       case "GET":
+               for uuid, _ := range kc.LocalRoots() {
+                       reader, err = kc.GetIndex(uuid, prefix)
+                       if err != nil {
+                               break
+                       }
+
+                       var readBytes []byte
+                       readBytes, err = ioutil.ReadAll(reader)
+                       if err != nil {
+                               break
+                       }
+
+                       // Got index; verify that it is complete
+                       if !strings.HasSuffix(string(readBytes), "\n\n") {
+                               err = errors.New("Got incomplete index")
+                       }
+
+                       indexResp = append(indexResp, (readBytes[0 : len(readBytes)-1])...)
+               }
+               indexResp = append(indexResp, ([]byte("\n"))...)
+       default:
+               status, err = http.StatusNotImplemented, MethodNotSupported
+               return
+       }
+
+       switch err {
+       case nil:
+               status = http.StatusOK
+               resp.Header().Set("Content-Length", fmt.Sprint(len(indexResp)))
+               _, err = resp.Write(indexResp)
+       default:
+               status = http.StatusBadGateway
+       }
+}
index 22cc72ea15ea4b23b308e355a0292cec808557d2..a08d1b603929424259ff0a65ad6eb7d80e227c97 100644 (file)
@@ -406,3 +406,53 @@ func (s *ServerRequiredSuite) TestStripHint(c *C) {
                "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
 
 }
+
+func (s *ServerRequiredSuite) TestGetIndex(c *C) {
+       kc := runProxy(c, []string{"keepproxy"}, 28852, false)
+       waitForListener()
+       defer closeListener()
+
+       data := []byte("index-data")
+       hash := fmt.Sprintf("%x", md5.Sum(data))
+
+       hash2, rep, err := kc.PutB(data)
+       c.Check(hash2, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, hash))
+       c.Check(rep, Equals, 2)
+       c.Check(err, Equals, nil)
+
+       reader, blocklen, _, err := kc.Get(hash)
+       c.Assert(err, Equals, nil)
+       c.Check(blocklen, Equals, int64(10))
+       all, err := ioutil.ReadAll(reader)
+       c.Check(all, DeepEquals, data)
+
+       // GetIndex with no prefix
+       indexReader, err := kc.GetIndex("proxy", "")
+       c.Assert(err, Equals, nil)
+       indexResp, err := ioutil.ReadAll(indexReader)
+       locators := strings.Split(string(indexResp), "\n")
+       count := 0
+       for _, locator := range locators {
+               if strings.HasPrefix(locator, hash) {
+                       count += 1
+               }
+       }
+       c.Assert(2, Equals, count)
+
+       // GetIndex with prefix
+       indexReader, err = kc.GetIndex("proxy", hash[0:3])
+       c.Assert(err, Equals, nil)
+       indexResp, err = ioutil.ReadAll(indexReader)
+       locators = strings.Split(string(indexResp), "\n")
+       count = 0
+       for _, locator := range locators {
+               if strings.HasPrefix(locator, hash) {
+                       count += 1
+               }
+       }
+       c.Assert(2, Equals, count)
+
+       // GetIndex with no such prefix
+       indexReader, err = kc.GetIndex("proxy", "xyz")
+       c.Assert((err != nil), Equals, true)
+}