X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d53271e587b7bdbfe37b8ae8eaf890dd69c2796b..29d5ca4976ef491f04ed88a286656b2a94453c06:/services/keepproxy/keepproxy.go diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go index f2a93f1e3a..226c388ae2 100644 --- a/services/keepproxy/keepproxy.go +++ b/services/keepproxy/keepproxy.go @@ -14,7 +14,6 @@ import ( "net/http" "os" "os/signal" - "reflect" "regexp" "sync" "syscall" @@ -22,8 +21,8 @@ import ( ) // Default TCP address on which to listen for requests. -// Initialized by the -listen flag. -const DEFAULT_ADDR = ":25107" +// Override with -listen. +const DefaultAddr = ":25107" var listener net.Listener @@ -37,12 +36,12 @@ func main() { pidfile string ) - flagset := flag.NewFlagSet("default", flag.ExitOnError) + flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError) flagset.StringVar( &listen, "listen", - DEFAULT_ADDR, + DefaultAddr, "Interface on which to listen for requests, in the format "+ "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+ "to listen on all network interfaces.") @@ -84,6 +83,9 @@ func main() { log.Fatalf("Error setting up arvados client %s", err.Error()) } + if os.Getenv("ARVADOS_DEBUG") != "" { + keepclient.DebugPrintf = log.Printf + } kc, err := keepclient.MakeKeepClient(&arv) if err != nil { log.Fatalf("Error setting up keep client %s", err.Error()) @@ -100,15 +102,14 @@ func main() { } kc.Want_replicas = default_replicas - kc.Client.Timeout = time.Duration(timeout) * time.Second + go kc.RefreshServices(5*time.Minute, 3*time.Second) listener, err = net.Listen("tcp", listen) if err != nil { log.Fatalf("Could not listen on %v", listen) } - - go RefreshServicesList(kc) + log.Printf("Arvados Keep proxy started listening on %v", listener.Addr()) // Shut down the server gracefully (by closing the listener) // if SIGTERM is received. @@ -121,9 +122,7 @@ func main() { signal.Notify(term, syscall.SIGTERM) signal.Notify(term, syscall.SIGINT) - log.Printf("Arvados Keep proxy started listening on %v", listener.Addr()) - - // Start listening for requests. + // Start serving requests. http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc)) log.Println("shutting down") @@ -135,30 +134,6 @@ type ApiTokenCache struct { expireTime int64 } -// Refresh the keep service list every five minutes. -func RefreshServicesList(kc *keepclient.KeepClient) { - var previousRoots = []map[string]string{} - var delay time.Duration = 0 - for { - time.Sleep(delay * time.Second) - delay = 300 - if err := kc.DiscoverKeepServers(); err != nil { - log.Println("Error retrieving services list:", err) - delay = 3 - continue - } - newRoots := []map[string]string{kc.LocalRoots(), kc.GatewayRoots()} - if !reflect.DeepEqual(previousRoots, newRoots) { - log.Printf("Updated services list: locals %v gateways %v", newRoots[0], newRoots[1]) - } - if len(newRoots[0]) == 0 { - log.Print("WARNING: No local services. Retrying in 3 seconds.") - delay = 3 - } - previousRoots = newRoots - } -} - // Cache the token and set an expire time. If we already have an expire time // on the token, it is not updated. func (this *ApiTokenCache) RememberToken(token string) { @@ -191,17 +166,13 @@ func (this *ApiTokenCache) RecallToken(token string) bool { } func GetRemoteAddress(req *http.Request) string { - if realip := req.Header.Get("X-Real-IP"); realip != "" { - if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip { - return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded) - } else { - return realip - } + if xff := req.Header.Get("X-Forwarded-For"); xff != "" { + return xff + "," + req.RemoteAddr } return req.RemoteAddr } -func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) { +func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) { var auth string if auth = req.Header.Get("Authorization"); auth == "" { return false, "" @@ -214,7 +185,7 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re } if cache.RecallToken(tok) { - // Valid in the cache, short circut + // Valid in the cache, short circuit return true, tok } @@ -331,7 +302,7 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques var pass bool var tok string - if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass { + if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass { status, err = http.StatusForbidden, BadAuthorizationHeader return } @@ -362,7 +333,7 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided") } - switch err { + switch respErr := err.(type) { case nil: status = http.StatusOK resp.Header().Set("Content-Length", fmt.Sprint(expectLength)) @@ -375,10 +346,16 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques err = ContentLengthMismatch } } - case keepclient.BlockNotFound: - status = http.StatusNotFound + case keepclient.Error: + if respErr == keepclient.BlockNotFound { + status = http.StatusNotFound + } else if respErr.Temporary() { + status = http.StatusBadGateway + } else { + status = 422 + } default: - status = http.StatusBadGateway + status = http.StatusInternalServerError } } @@ -390,7 +367,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques kc := *this.KeepClient var err error - var expectLength int64 = -1 + var expectLength int64 var status = http.StatusInternalServerError var wroteReplicas int var locatorOut string = "-" @@ -404,15 +381,8 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques locatorIn := mux.Vars(req)["locator"] - if req.Header.Get("Content-Length") != "" { - _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength) - if err != nil { - resp.Header().Set("Content-Length", fmt.Sprintf("%d", expectLength)) - } - - } - - if expectLength < 0 { + _, err = fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength) + if err != nil || expectLength < 0 { err = LengthRequiredError status = http.StatusLengthRequired return @@ -432,7 +402,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques var pass bool var tok string - if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass { + if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass { err = BadAuthorizationHeader status = http.StatusForbidden return @@ -447,7 +417,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques if req.Header.Get("X-Keep-Desired-Replicas") != "" { var r int _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r) - if err != nil { + if err == nil { kc.Want_replicas = r } } @@ -515,16 +485,15 @@ func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques kc := *handler.KeepClient - var pass bool - var tok string - if pass, tok = CheckAuthorizationHeader(kc, handler.ApiTokenCache, req); !pass { + ok, token := CheckAuthorizationHeader(&kc, handler.ApiTokenCache, req) + if !ok { status, err = http.StatusForbidden, BadAuthorizationHeader return } // Copy ArvadosClient struct and use the client's API token arvclient := *kc.Arvados - arvclient.ApiToken = tok + arvclient.ApiToken = token kc.Arvados = &arvclient // Only GET method is supported @@ -533,6 +502,7 @@ func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques return } + // Get index from all LocalRoots and write to resp var reader io.Reader for uuid := range kc.LocalRoots() { reader, err = kc.GetIndex(uuid, prefix) @@ -541,15 +511,7 @@ func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques return } - var readBytes []byte - readBytes, err = ioutil.ReadAll(reader) - if err != nil { - status = http.StatusBadGateway - return - } - - // Got index for this server; write to resp - _, err := resp.Write(readBytes) + _, err = io.Copy(resp, reader) if err != nil { status = http.StatusBadGateway return