X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c48ed6487b9148269f25f6733a1d4f1860b8ea72..28ac7d4d181a0a9678099e579310d463c5057f06:/services/keepproxy/keepproxy.go diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go index 0b17d97756..c6fd99b9d8 100644 --- a/services/keepproxy/keepproxy.go +++ b/services/keepproxy/keepproxy.go @@ -26,10 +26,10 @@ import ( "git.curoverse.com/arvados.git/sdk/go/health" "git.curoverse.com/arvados.git/sdk/go/httpserver" "git.curoverse.com/arvados.git/sdk/go/keepclient" - log "github.com/Sirupsen/logrus" "github.com/coreos/go-systemd/daemon" "github.com/ghodss/yaml" "github.com/gorilla/mux" + log "github.com/sirupsen/logrus" ) var version = "dev" @@ -152,7 +152,7 @@ func main() { } err = f.Sync() if err != nil { - log.Fatal("sync(%s): %s", cfg.PIDFile, err) + log.Fatalf("sync(%s): %s", cfg.PIDFile, err) } } @@ -182,7 +182,7 @@ func main() { // Start serving requests. router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken) - http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router))) + http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(nil, router))) log.Println("shutting down") } @@ -257,6 +257,7 @@ func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, r var err error arv := *kc.Arvados arv.ApiToken = tok + arv.RequestID = req.Header.Get("X-Request-Id") if op == "read" { err = arv.Call("HEAD", "keep_services", "", "accessible", nil, nil) } else { @@ -273,6 +274,14 @@ func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, r return true, tok } +// We need to make a private copy of the default http transport early +// in initialization, then make copies of our private copy later. It +// won't be safe to copy http.DefaultTransport itself later, because +// its private mutexes might have already been used. (Without this, +// the test suite sometimes panics "concurrent map writes" in +// net/http.(*Transport).removeIdleConnLocked().) +var defaultTransport = *(http.DefaultTransport.(*http.Transport)) + type proxyHandler struct { http.Handler *keepclient.KeepClient @@ -286,7 +295,7 @@ type proxyHandler struct { func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient, timeout time.Duration, mgmtToken string) http.Handler { rest := mux.NewRouter() - transport := *(http.DefaultTransport.(*http.Transport)) + transport := defaultTransport transport.DialContext = (&net.Dialer{ Timeout: keepclient.DefaultConnectTimeout, KeepAlive: keepclient.DefaultKeepAlive, @@ -478,6 +487,15 @@ func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) { locatorIn := mux.Vars(req)["locator"] + // Check if the client specified storage classes + if req.Header.Get("X-Keep-Storage-Classes") != "" { + var scl []string + for _, sc := range strings.Split(req.Header.Get("X-Keep-Storage-Classes"), ",") { + scl = append(scl, strings.Trim(sc, " ")) + } + kc.StorageClasses = scl + } + _, err = fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength) if err != nil || expectLength < 0 { err = LengthRequiredError @@ -523,7 +541,7 @@ func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) { if locatorIn == "" { bytes, err2 := ioutil.ReadAll(req.Body) if err2 != nil { - _ = errors.New(fmt.Sprintf("Error reading request body: %s", err2)) + err = fmt.Errorf("Error reading request body: %s", err2) status = http.StatusInternalServerError return } @@ -621,13 +639,13 @@ func (h *proxyHandler) Index(resp http.ResponseWriter, req *http.Request) { func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient { kc := *h.KeepClient + kc.RequestID = req.Header.Get("X-Request-Id") kc.HTTPClient = &proxyClient{ client: &http.Client{ Timeout: h.timeout, Transport: h.transport, }, - proto: req.Proto, - requestID: req.Header.Get("X-Request-Id"), + proto: req.Proto, } return &kc }