var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
var InvalidLocatorError = errors.New("Invalid locator")
+var NoSuchKeepServer = errors.New("No keep server matching the given UUID is found")
+var GetIndexError = errors.New("Error getting index")
+var IncompleteIndexError = errors.New("Got incomplete index")
const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
return 0, "", BlockNotFound
}
+// GetIndex retrieves a list of blocks stored on the given server whose hashes
+// begin with the given prefix. The returned reader will return an error (other
+// than EOF) if the complete index cannot be retrieved. This should only be
+// expected to return useful results if the client is using a "data manager token"
+// recognized by the Keep services.
+func (kc *KeepClient) GetIndex(keepServiceUUID, prefix string) (io.Reader, error) {
+ url := kc.LocalRoots()[keepServiceUUID]
+ if url == "" {
+ log.Printf("No such keep server found: %v", keepServiceUUID)
+ return nil, NoSuchKeepServer
+ }
+
+ url += "/index"
+ if prefix != "" {
+ url += "/" + prefix
+ }
+
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ log.Printf("GET index error: %v", err)
+ return nil, GetIndexError
+ }
+
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+ resp, err := kc.Client.Do(req)
+ if err != nil || resp.StatusCode != http.StatusOK {
+ log.Printf("GET index error: %v; status code: %v", err, resp.StatusCode)
+ return nil, GetIndexError
+ }
+
+ var respbody []byte
+ if resp.Body != nil {
+ respbody, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ log.Printf("GET index error: %v", err)
+ return nil, GetIndexError
+ }
+
+ // Got index; verify that it is complete
+ if !strings.HasSuffix(string(respbody), "\n\n") {
+ log.Printf("Got incomplete index for %v", url)
+ return nil, IncompleteIndexError
+ }
+ }
+
+ // Got complete index or "" if no locators matching prefix
+ return strings.NewReader(string(respbody)), nil
+}
+
// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
// services: uuid -> baseURI.
func (kc *KeepClient) LocalRoots() map[string]string {
c.Check(err, Equals, InsufficientReplicasError)
c.Check(replicas, Equals, 0)
}
+
+type StubGetIndexHandler struct {
+ c *C
+ expectPath string
+ expectApiToken string
+ httpStatus int
+ body []byte
+}
+
+func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ h.c.Check(req.URL.Path, Equals, h.expectPath)
+ h.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", h.expectApiToken))
+ resp.WriteHeader(h.httpStatus)
+ resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(h.body)))
+ resp.Write(h.body)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := StubGetIndexHandler{
+ c,
+ "/index",
+ "abc123",
+ http.StatusOK,
+ []byte(string(hash) + "\n\n")}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ r, err := kc.GetIndex("x", "")
+ c.Check(err, Equals, nil)
+
+ content, err2 := ioutil.ReadAll(r)
+ c.Check(err2, Equals, nil)
+ c.Check(content, DeepEquals, st.body)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := StubGetIndexHandler{
+ c,
+ "/index/" + hash[0:3],
+ "abc123",
+ http.StatusOK,
+ []byte(string(hash) + "\n\n")}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ r, err := kc.GetIndex("x", hash[0:3])
+ c.Check(err, Equals, nil)
+
+ content, err2 := ioutil.ReadAll(r)
+ c.Check(err2, Equals, nil)
+ c.Check(content, DeepEquals, st.body)
+}
+
+func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := StubGetIndexHandler{
+ c,
+ "/index/" + hash[0:3],
+ "abc123",
+ http.StatusOK,
+ []byte(string(hash))}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ _, err = kc.GetIndex("x", hash[0:3])
+ c.Check(err, Equals, IncompleteIndexError)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := StubGetIndexHandler{
+ c,
+ "/index/" + hash[0:3],
+ "abc123",
+ http.StatusOK,
+ []byte(string(hash))}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ _, err = kc.GetIndex("y", hash[0:3])
+ c.Check(err, Equals, NoSuchKeepServer)
+}
+
+func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
+ st := StubGetIndexHandler{
+ c,
+ "/index/xyz",
+ "abc123",
+ http.StatusOK,
+ []byte("")}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ _, err = kc.GetIndex("x", "xyz")
+ c.Check((err != nil), Equals, true)
+}
"os/signal"
"reflect"
"regexp"
+ "strings"
"sync"
"syscall"
"time"
*ApiTokenCache
}
+type IndexHandler struct {
+ *keepclient.KeepClient
+ *ApiTokenCache
+}
+
type InvalidPathHandler struct{}
type OptionsHandler struct{}
rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
GetBlockHandler{kc, t}).Methods("GET", "HEAD")
rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+
+ // List all blocks
+ rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
+
+ // List blocks whose hash has the given prefix
+ rest.Handle(`/index/{prefix}`, IndexHandler{kc, t}).Methods("GET")
}
if enable_put {
status = http.StatusBadGateway
}
}
+
+func (this IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ SetCorsHeaders(resp)
+
+ prefix := mux.Vars(req)["prefix"]
+ var err error
+ var status int
+
+ defer func() {
+ if status != http.StatusOK {
+ http.Error(resp, err.Error(), status)
+ }
+ }()
+
+ kc := *this.KeepClient
+
+ var pass bool
+ var tok string
+ if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
+ status, err = http.StatusForbidden, BadAuthorizationHeader
+ return
+ }
+
+ // Copy ArvadosClient struct and use the client's API token
+ arvclient := *kc.Arvados
+ arvclient.ApiToken = tok
+ kc.Arvados = &arvclient
+
+ var indexResp []byte
+ var reader io.Reader
+
+ switch req.Method {
+ case "GET":
+ for uuid, _ := range kc.LocalRoots() {
+ reader, err = kc.GetIndex(uuid, prefix)
+ if err != nil {
+ break
+ }
+
+ var readBytes []byte
+ readBytes, err = ioutil.ReadAll(reader)
+ if err != nil {
+ break
+ }
+
+ // Got index; verify that it is complete
+ if !strings.HasSuffix(string(readBytes), "\n\n") {
+ err = errors.New("Got incomplete index")
+ }
+
+ indexResp = append(indexResp, (readBytes[0 : len(readBytes)-1])...)
+ }
+ indexResp = append(indexResp, ([]byte("\n"))...)
+ default:
+ status, err = http.StatusNotImplemented, MethodNotSupported
+ return
+ }
+
+ switch err {
+ case nil:
+ status = http.StatusOK
+ resp.Header().Set("Content-Length", fmt.Sprint(len(indexResp)))
+ _, err = resp.Write(indexResp)
+ default:
+ status = http.StatusBadGateway
+ }
+}
"http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
}
+
+func (s *ServerRequiredSuite) TestGetIndex(c *C) {
+ kc := runProxy(c, []string{"keepproxy"}, 28852, false)
+ waitForListener()
+ defer closeListener()
+
+ data := []byte("index-data")
+ hash := fmt.Sprintf("%x", md5.Sum(data))
+
+ hash2, rep, err := kc.PutB(data)
+ c.Check(hash2, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, hash))
+ c.Check(rep, Equals, 2)
+ c.Check(err, Equals, nil)
+
+ reader, blocklen, _, err := kc.Get(hash)
+ c.Assert(err, Equals, nil)
+ c.Check(blocklen, Equals, int64(10))
+ all, err := ioutil.ReadAll(reader)
+ c.Check(all, DeepEquals, data)
+
+ // GetIndex with no prefix
+ indexReader, err := kc.GetIndex("proxy", "")
+ c.Assert(err, Equals, nil)
+ indexResp, err := ioutil.ReadAll(indexReader)
+ locators := strings.Split(string(indexResp), "\n")
+ count := 0
+ for _, locator := range locators {
+ if strings.HasPrefix(locator, hash) {
+ count += 1
+ }
+ }
+ c.Assert(2, Equals, count)
+
+ // GetIndex with prefix
+ indexReader, err = kc.GetIndex("proxy", hash[0:3])
+ c.Assert(err, Equals, nil)
+ indexResp, err = ioutil.ReadAll(indexReader)
+ locators = strings.Split(string(indexResp), "\n")
+ count = 0
+ for _, locator := range locators {
+ if strings.HasPrefix(locator, hash) {
+ count += 1
+ }
+ }
+ c.Assert(2, Equals, count)
+
+ // GetIndex with no such prefix
+ indexReader, err = kc.GetIndex("proxy", "xyz")
+ c.Assert((err != nil), Equals, true)
+}