9187: If a container is reported Queued, but we are monitoring it, stop monitoring it.
[arvados.git] / services / keepproxy / keepproxy.go
index af81ba242e08d9199d2e79f65a79dce6b0c26c68..4cd931037ef830dfd8a6b25022126c84c13d7036 100644 (file)
@@ -14,14 +14,15 @@ import (
        "net/http"
        "os"
        "os/signal"
+       "regexp"
        "sync"
        "syscall"
        "time"
 )
 
 // Default TCP address on which to listen for requests.
-// Initialized by the -listen flag.
-const DEFAULT_ADDR = ":25107"
+// Override with -listen.
+const DefaultAddr = ":25107"
 
 var listener net.Listener
 
@@ -35,12 +36,12 @@ func main() {
                pidfile          string
        )
 
-       flagset := flag.NewFlagSet("default", flag.ExitOnError)
+       flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
 
        flagset.StringVar(
                &listen,
                "listen",
-               DEFAULT_ADDR,
+               DefaultAddr,
                "Interface on which to listen for requests, in the format "+
                        "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
                        "to listen on all network interfaces.")
@@ -82,6 +83,9 @@ func main() {
                log.Fatalf("Error setting up arvados client %s", err.Error())
        }
 
+       if os.Getenv("ARVADOS_DEBUG") != "" {
+               keepclient.DebugPrintf = log.Printf
+       }
        kc, err := keepclient.MakeKeepClient(&arv)
        if err != nil {
                log.Fatalf("Error setting up keep client %s", err.Error())
@@ -98,15 +102,14 @@ func main() {
        }
 
        kc.Want_replicas = default_replicas
-
        kc.Client.Timeout = time.Duration(timeout) * time.Second
+       go kc.RefreshServices(5*time.Minute, 3*time.Second)
 
        listener, err = net.Listen("tcp", listen)
        if err != nil {
                log.Fatalf("Could not listen on %v", listen)
        }
-
-       go RefreshServicesList(kc)
+       log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
 
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
@@ -119,9 +122,7 @@ func main() {
        signal.Notify(term, syscall.SIGTERM)
        signal.Notify(term, syscall.SIGINT)
 
-       log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
-
-       // Start listening for requests.
+       // Start serving requests.
        http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
 
        log.Println("shutting down")
@@ -133,29 +134,6 @@ type ApiTokenCache struct {
        expireTime int64
 }
 
-// Refresh the keep service list every five minutes.
-func RefreshServicesList(kc *keepclient.KeepClient) {
-       previousRoots := ""
-       for {
-               if err := kc.DiscoverKeepServers(); err != nil {
-                       log.Println("Error retrieving services list:", err)
-                       time.Sleep(3*time.Second)
-                       previousRoots = ""
-               } else if len(kc.LocalRoots()) == 0 {
-                       log.Println("Received empty services list")
-                       time.Sleep(3*time.Second)
-                       previousRoots = ""
-               } else {
-                       newRoots := fmt.Sprint("Locals ", kc.LocalRoots(), ", gateways ", kc.GatewayRoots())
-                       if newRoots != previousRoots {
-                               log.Println("Updated services list:", newRoots)
-                               previousRoots = newRoots
-                       }
-                       time.Sleep(300*time.Second)
-               }
-       }
-}
-
 // Cache the token and set an expire time.  If we already have an expire time
 // on the token, it is not updated.
 func (this *ApiTokenCache) RememberToken(token string) {
@@ -188,17 +166,13 @@ func (this *ApiTokenCache) RecallToken(token string) bool {
 }
 
 func GetRemoteAddress(req *http.Request) string {
-       if realip := req.Header.Get("X-Real-IP"); realip != "" {
-               if forwarded := req.Header.Get("X-Forwarded-For"); forwarded != realip {
-                       return fmt.Sprintf("%s (X-Forwarded-For %s)", realip, forwarded)
-               } else {
-                       return realip
-               }
+       if xff := req.Header.Get("X-Forwarded-For"); xff != "" {
+               return xff + "," + req.RemoteAddr
        }
        return req.RemoteAddr
 }
 
-func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
+func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) (pass bool, tok string) {
        var auth string
        if auth = req.Header.Get("Authorization"); auth == "" {
                return false, ""
@@ -211,7 +185,7 @@ func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, re
        }
 
        if cache.RecallToken(tok) {
-               // Valid in the cache, short circut
+               // Valid in the cache, short circuit
                return true, tok
        }
 
@@ -238,6 +212,11 @@ type PutBlockHandler struct {
        *ApiTokenCache
 }
 
+type IndexHandler struct {
+       *keepclient.KeepClient
+       *ApiTokenCache
+}
+
 type InvalidPathHandler struct{}
 
 type OptionsHandler struct{}
@@ -259,6 +238,12 @@ func MakeRESTRouter(
                rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
                        GetBlockHandler{kc, t}).Methods("GET", "HEAD")
                rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+
+               // List all blocks
+               rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
+
+               // List blocks whose hash has the given prefix
+               rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET")
        }
 
        if enable_put {
@@ -295,6 +280,8 @@ var BadAuthorizationHeader = errors.New("Missing or invalid Authorization header
 var ContentLengthMismatch = errors.New("Actual length != expected content length")
 var MethodNotSupported = errors.New("Method not supported")
 
+var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)")
+
 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        SetCorsHeaders(resp)
 
@@ -315,7 +302,7 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
        var pass bool
        var tok string
-       if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
+       if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
                status, err = http.StatusForbidden, BadAuthorizationHeader
                return
        }
@@ -327,6 +314,8 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
        var reader io.ReadCloser
 
+       locator = removeHint.ReplaceAllString(locator, "$1")
+
        switch req.Method {
        case "HEAD":
                expectLength, proxiedURI, err = kc.Ask(locator)
@@ -344,7 +333,7 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
        }
 
-       switch err {
+       switch respErr := err.(type) {
        case nil:
                status = http.StatusOK
                resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
@@ -357,10 +346,16 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                                err = ContentLengthMismatch
                        }
                }
-       case keepclient.BlockNotFound:
-               status = http.StatusNotFound
+       case keepclient.Error:
+               if respErr == keepclient.BlockNotFound {
+                       status = http.StatusNotFound
+               } else if respErr.Temporary() {
+                       status = http.StatusBadGateway
+               } else {
+                       status = 422
+               }
        default:
-               status = http.StatusBadGateway
+               status = http.StatusInternalServerError
        }
 }
 
@@ -400,15 +395,21 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                return
        }
 
-       if loc := keepclient.MakeLocator(locatorIn); loc.Size > 0 && int64(loc.Size) != expectLength {
-               err = LengthMismatchError
-               status = http.StatusBadRequest
-               return
+       if locatorIn != "" {
+               var loc *keepclient.Locator
+               if loc, err = keepclient.MakeLocator(locatorIn); err != nil {
+                       status = http.StatusBadRequest
+                       return
+               } else if loc.Size > 0 && int64(loc.Size) != expectLength {
+                       err = LengthMismatchError
+                       status = http.StatusBadRequest
+                       return
+               }
        }
 
        var pass bool
        var tok string
-       if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
+       if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
                err = BadAuthorizationHeader
                status = http.StatusForbidden
                return
@@ -468,3 +469,63 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                status = http.StatusBadGateway
        }
 }
+
+// ServeHTTP implementation for IndexHandler
+// Supports only GET requests for /index/{prefix:[0-9a-f]{0,32}}
+// For each keep server found in LocalRoots:
+//   Invokes GetIndex using keepclient
+//   Expects "complete" response (terminating with blank new line)
+//   Aborts on any errors
+// Concatenates responses from all those keep servers and returns
+func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       SetCorsHeaders(resp)
+
+       prefix := mux.Vars(req)["prefix"]
+       var err error
+       var status int
+
+       defer func() {
+               if status != http.StatusOK {
+                       http.Error(resp, err.Error(), status)
+               }
+       }()
+
+       kc := *handler.KeepClient
+
+       ok, token := CheckAuthorizationHeader(&kc, handler.ApiTokenCache, req)
+       if !ok {
+               status, err = http.StatusForbidden, BadAuthorizationHeader
+               return
+       }
+
+       // Copy ArvadosClient struct and use the client's API token
+       arvclient := *kc.Arvados
+       arvclient.ApiToken = token
+       kc.Arvados = &arvclient
+
+       // Only GET method is supported
+       if req.Method != "GET" {
+               status, err = http.StatusNotImplemented, MethodNotSupported
+               return
+       }
+
+       // Get index from all LocalRoots and write to resp
+       var reader io.Reader
+       for uuid := range kc.LocalRoots() {
+               reader, err = kc.GetIndex(uuid, prefix)
+               if err != nil {
+                       status = http.StatusBadGateway
+                       return
+               }
+
+               _, err = io.Copy(resp, reader)
+               if err != nil {
+                       status = http.StatusBadGateway
+                       return
+               }
+       }
+
+       // Got index from all the keep servers and wrote to resp
+       status = http.StatusOK
+       resp.Write([]byte("\n"))
+}