X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9ae180ec18f1f397889a4531a12999942edd003a..3e4f591181a38d739d94cf9321f874c19937d199:/services/keepproxy/keepproxy.go diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go index af81ba242e..d3dbeaf89e 100644 --- a/services/keepproxy/keepproxy.go +++ b/services/keepproxy/keepproxy.go @@ -14,6 +14,8 @@ import ( "net/http" "os" "os/signal" + "reflect" + "regexp" "sync" "syscall" "time" @@ -135,24 +137,25 @@ type ApiTokenCache struct { // Refresh the keep service list every five minutes. func RefreshServicesList(kc *keepclient.KeepClient) { - previousRoots := "" + var previousRoots = []map[string]string{} + var delay time.Duration = 0 for { + time.Sleep(delay * time.Second) + delay = 300 if err := kc.DiscoverKeepServers(); err != nil { log.Println("Error retrieving services list:", err) - time.Sleep(3*time.Second) - previousRoots = "" - } else if len(kc.LocalRoots()) == 0 { - log.Println("Received empty services list") - time.Sleep(3*time.Second) - previousRoots = "" - } else { - newRoots := fmt.Sprint("Locals ", kc.LocalRoots(), ", gateways ", kc.GatewayRoots()) - if newRoots != previousRoots { - log.Println("Updated services list:", newRoots) - previousRoots = newRoots - } - time.Sleep(300*time.Second) + delay = 3 + continue + } + newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()} + if !reflect.DeepEqual(previousRoots, newRoots) { + log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1]) + } + if len(newRoots[0]) == 0 { + log.Print("WARNING: No local services. Retrying in 3 seconds.") + delay = 3 } + previousRoots = newRoots } } @@ -238,6 +241,11 @@ type PutBlockHandler struct { *ApiTokenCache } +type IndexHandler struct { + *keepclient.KeepClient + *ApiTokenCache +} + type InvalidPathHandler struct{} type OptionsHandler struct{} @@ -259,6 +267,12 @@ func MakeRESTRouter( rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD") rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD") + + // List all blocks + rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET") + + // List blocks whose hash has the given prefix + rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET") } if enable_put { @@ -295,6 +309,8 @@ var BadAuthorizationHeader = errors.New("Missing or invalid Authorization header var ContentLengthMismatch = errors.New("Actual length != expected content length") var MethodNotSupported = errors.New("Method not supported") +var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)") + func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { SetCorsHeaders(resp) @@ -327,6 +343,8 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques var reader io.ReadCloser + locator = removeHint.ReplaceAllString(locator, "$1") + switch req.Method { case "HEAD": expectLength, proxiedURI, err = kc.Ask(locator) @@ -344,7 +362,7 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided") } - switch err { + switch respErr := err.(type) { case nil: status = http.StatusOK resp.Header().Set("Content-Length", fmt.Sprint(expectLength)) @@ -357,10 +375,16 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques err = ContentLengthMismatch } } - case keepclient.BlockNotFound: - status = http.StatusNotFound + case keepclient.Error: + if respErr == keepclient.BlockNotFound { + status = http.StatusNotFound + } else if respErr.Temporary() { + status = http.StatusBadGateway + } else { + status = 422 + } default: - status = http.StatusBadGateway + status = http.StatusInternalServerError } } @@ -400,10 +424,16 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques return } - if loc := keepclient.MakeLocator(locatorIn); loc.Size > 0 && int64(loc.Size) != expectLength { - err = LengthMismatchError - status = http.StatusBadRequest - return + if locatorIn != "" { + var loc *keepclient.Locator + if loc, err = keepclient.MakeLocator(locatorIn); err != nil { + status = http.StatusBadRequest + return + } else if loc.Size > 0 && int64(loc.Size) != expectLength { + err = LengthMismatchError + status = http.StatusBadRequest + return + } } var pass bool @@ -468,3 +498,63 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques status = http.StatusBadGateway } } + +// ServeHTTP implementation for IndexHandler +// Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}} +// For each keep server found in LocalRoots: +// Invokes GetIndex using keepclient +// Expects "complete" response (terminating with blank new line) +// Aborts on any errors +// Concatenates responses from all those keep servers and returns +func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + SetCorsHeaders(resp) + + prefix := mux.Vars(req)["prefix"] + var err error + var status int + + defer func() { + if status != http.StatusOK { + http.Error(resp, err.Error(), status) + } + }() + + kc := *handler.KeepClient + + ok, token := CheckAuthorizationHeader(kc, handler.ApiTokenCache, req) + if !ok { + status, err = http.StatusForbidden, BadAuthorizationHeader + return + } + + // Copy ArvadosClient struct and use the client's API token + arvclient := *kc.Arvados + arvclient.ApiToken = token + kc.Arvados = &arvclient + + // Only GET method is supported + if req.Method != "GET" { + status, err = http.StatusNotImplemented, MethodNotSupported + return + } + + // Get index from all LocalRoots and write to resp + var reader io.Reader + for uuid := range kc.LocalRoots() { + reader, err = kc.GetIndex(uuid, prefix) + if err != nil { + status = http.StatusBadGateway + return + } + + _, err = io.Copy(resp, reader) + if err != nil { + status = http.StatusBadGateway + return + } + } + + // Got index from all the keep servers and wrote to resp + status = http.StatusOK + resp.Write([]byte("\n")) +}