X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a8749c6b2c7d339c3355dd130ca37c9c876f72a5..8191d7365a61f4d4309c2f0b387188303320a889:/services/keep/src/arvados.org/keepproxy/keepproxy.go diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go index 38e14fd283..42f5a78231 100644 --- a/services/keep/src/arvados.org/keepproxy/keepproxy.go +++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go @@ -10,7 +10,9 @@ import ( "net" "net/http" "os" + "os/signal" "sync" + "syscall" "time" ) @@ -89,10 +91,37 @@ func main() { go RefreshServicesList(&kc) + // Shut down the server gracefully (by closing the listener) + // if SIGTERM is received. + term := make(chan os.Signal, 1) + go func(sig <-chan os.Signal) { + s := <-sig + log.Println("caught signal:", s) + listener.Close() + }(term) + signal.Notify(term, syscall.SIGTERM) + signal.Notify(term, syscall.SIGINT) + + if pidfile != "" { + f, err := os.Create(pidfile) + if err == nil { + fmt.Fprint(f, os.Getpid()) + f.Close() + } else { + log.Printf("Error writing pid file (%s): %s", pidfile, err.Error()) + } + } + log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots()) // Start listening for requests. http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc)) + + log.Println("shutting down") + + if pidfile != "" { + os.Remove(pidfile) + } } type ApiTokenCache struct { @@ -233,16 +262,13 @@ func MakeRESTRouter( rest := mux.NewRouter() if enable_get { - gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}) - ghsig := rest.Handle( - `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`, - GetBlockHandler{kc, t}) - - gh.Methods("GET", "HEAD") - ghsig.Methods("GET", "HEAD") + rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, + GetBlockHandler{kc, t}).Methods("GET", "HEAD") + rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD") } if enable_put { + rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT") rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT") } @@ -261,8 +287,9 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques kc := *this.KeepClient hash := mux.Vars(req)["hash"] - signature := mux.Vars(req)["signature"] - timestamp := mux.Vars(req)["timestamp"] + hints := mux.Vars(req)["hints"] + + locator := keepclient.MakeLocator2(hash, hints) log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash) @@ -276,10 +303,10 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques var blocklen int64 if req.Method == "GET" { - reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp) + reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp) defer reader.Close() } else if req.Method == "HEAD" { - blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp) + blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp) } resp.Header().Set("Content-Length", fmt.Sprint(blocklen)) @@ -314,6 +341,9 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques kc := *this.KeepClient hash := mux.Vars(req)["hash"] + hints := mux.Vars(req)["hints"] + + locator := keepclient.MakeLocator2(hash, hints) var contentLength int64 = -1 if req.Header.Get("Content-Length") != "" { @@ -331,6 +361,11 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques return } + if locator.Size > 0 && int64(locator.Size) != contentLength { + http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest) + return + } + if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) { http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden) return @@ -346,7 +381,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques } // Now try to put the block through - replicas, err := kc.PutHR(hash, req.Body, contentLength) + hash, replicas, err := kc.PutHR(hash, req.Body, contentLength) // Tell the client how many successful PUTs we accomplished resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas)) @@ -355,6 +390,10 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques case nil: // Default will return http.StatusOK log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas) + n, err2 := io.WriteString(resp, hash) + if err2 != nil { + log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error()) + } case keepclient.OversizeBlockError: // Too much data @@ -366,6 +405,10 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques // client can decide if getting less than the number of // replications it asked for is a fatal error. // Default will return http.StatusOK + n, err2 := io.WriteString(resp, hash) + if err2 != nil { + log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error()) + } } else { http.Error(resp, "", http.StatusServiceUnavailable) }