X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1b2afc5aa599eb452a1f30e706e19b964e26cae0..3e4f591181a38d739d94cf9321f874c19937d199:/services/keepproxy/keepproxy.go diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go index ebdb6eb376..d3dbeaf89e 100644 --- a/services/keepproxy/keepproxy.go +++ b/services/keepproxy/keepproxy.go @@ -15,6 +15,7 @@ import ( "os" "os/signal" "reflect" + "regexp" "sync" "syscall" "time" @@ -240,6 +241,11 @@ type PutBlockHandler struct { *ApiTokenCache } +type IndexHandler struct { + *keepclient.KeepClient + *ApiTokenCache +} + type InvalidPathHandler struct{} type OptionsHandler struct{} @@ -261,6 +267,12 @@ func MakeRESTRouter( rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD") rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD") + + // List all blocks + rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET") + + // List blocks whose hash has the given prefix + rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET") } if enable_put { @@ -297,6 +309,8 @@ var BadAuthorizationHeader = errors.New("Missing or invalid Authorization header var ContentLengthMismatch = errors.New("Actual length != expected content length") var MethodNotSupported = errors.New("Method not supported") +var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)") + func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { SetCorsHeaders(resp) @@ -329,6 +343,8 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques var reader io.ReadCloser + locator = removeHint.ReplaceAllString(locator, "$1") + switch req.Method { case "HEAD": expectLength, proxiedURI, err = kc.Ask(locator) @@ -346,7 +362,7 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided") } - switch err { + switch respErr := err.(type) { case nil: status = http.StatusOK resp.Header().Set("Content-Length", fmt.Sprint(expectLength)) @@ -359,10 +375,16 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques err = ContentLengthMismatch } } - case keepclient.BlockNotFound: - status = http.StatusNotFound + case keepclient.Error: + if respErr == keepclient.BlockNotFound { + status = http.StatusNotFound + } else if respErr.Temporary() { + status = http.StatusBadGateway + } else { + status = 422 + } default: - status = http.StatusBadGateway + status = http.StatusInternalServerError } } @@ -476,3 +498,63 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques status = http.StatusBadGateway } } + +// ServeHTTP implementation for IndexHandler +// Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}} +// For each keep server found in LocalRoots: +// Invokes GetIndex using keepclient +// Expects "complete" response (terminating with blank new line) +// Aborts on any errors +// Concatenates responses from all those keep servers and returns +func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + SetCorsHeaders(resp) + + prefix := mux.Vars(req)["prefix"] + var err error + var status int + + defer func() { + if status != http.StatusOK { + http.Error(resp, err.Error(), status) + } + }() + + kc := *handler.KeepClient + + ok, token := CheckAuthorizationHeader(kc, handler.ApiTokenCache, req) + if !ok { + status, err = http.StatusForbidden, BadAuthorizationHeader + return + } + + // Copy ArvadosClient struct and use the client's API token + arvclient := *kc.Arvados + arvclient.ApiToken = token + kc.Arvados = &arvclient + + // Only GET method is supported + if req.Method != "GET" { + status, err = http.StatusNotImplemented, MethodNotSupported + return + } + + // Get index from all LocalRoots and write to resp + var reader io.Reader + for uuid := range kc.LocalRoots() { + reader, err = kc.GetIndex(uuid, prefix) + if err != nil { + status = http.StatusBadGateway + return + } + + _, err = io.Copy(resp, reader) + if err != nil { + status = http.StatusBadGateway + return + } + } + + // Got index from all the keep servers and wrote to resp + status = http.StatusOK + resp.Write([]byte("\n")) +}