From: Tom Clegg Date: Thu, 27 Apr 2017 14:51:49 +0000 (-0400) Subject: 11537: Add Via header to proxied keepstore requests. X-Git-Tag: 1.1.0~282^2~1 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/72900c01e197d602e79fda8d306b17fd1e32a3ea 11537: Add Via header to proxied keepstore requests. Propagate keepclient PUT error messages to caller. --- diff --git a/sdk/go/keepclient/discover.go b/sdk/go/keepclient/discover.go index 2892031817..f3e3960698 100644 --- a/sdk/go/keepclient/discover.go +++ b/sdk/go/keepclient/discover.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "net/http" "os" "os/signal" "reflect" @@ -22,7 +23,9 @@ func (this *KeepClient) DiscoverKeepServers() error { if this.Arvados.KeepServiceURIs != nil { this.foundNonDiskSvc = true this.replicasPerService = 0 - this.setClientSettingsNonDisk() + if c, ok := this.Client.(*http.Client); ok { + this.setClientSettingsNonDisk(c) + } roots := make(map[string]string) for i, uri := range this.Arvados.KeepServiceURIs { roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri @@ -134,10 +137,12 @@ func (this *KeepClient) loadKeepServers(list svcList) error { gatewayRoots[service.Uuid] = url } - if this.foundNonDiskSvc { - this.setClientSettingsNonDisk() - } else { - this.setClientSettingsDisk() + if client, ok := this.Client.(*http.Client); ok { + if this.foundNonDiskSvc { + this.setClientSettingsNonDisk(client) + } else { + this.setClientSettingsDisk(client) + } } this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots) diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index 4f84afca61..b56cc7f724 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -6,8 +6,6 @@ import ( "crypto/md5" "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/streamer" "io" "io/ioutil" "net/http" @@ -15,6 +13,9 @@ import ( "strconv" "strings" "sync" + + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/streamer" ) // A Keep "block" is 64MB. @@ -47,8 +48,11 @@ type ErrNotFound struct { multipleResponseError } -var InsufficientReplicasError = errors.New("Could not write sufficient replicas") -var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")") +type InsufficientReplicasError error + +type OversizeBlockError error + +var ErrOversizeBlock = OversizeBlockError(errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")) var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST") var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN") var InvalidLocatorError = errors.New("Invalid locator") @@ -62,6 +66,10 @@ var ErrIncompleteIndex = errors.New("Got incomplete index") const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas" const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored" +type HTTPClient interface { + Do(*http.Request) (*http.Response, error) +} + // Information about Arvados and Keep servers. type KeepClient struct { Arvados *arvadosclient.ArvadosClient @@ -70,7 +78,7 @@ type KeepClient struct { writableLocalRoots *map[string]string gatewayRoots *map[string]string lock sync.RWMutex - Client *http.Client + Client HTTPClient Retries int BlockCache *BlockCache @@ -115,14 +123,14 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient { // Returns the locator for the written block, the number of replicas // written, and an error. // -// Returns an InsufficientReplicas error if 0 <= replicas < +// Returns an InsufficientReplicasError if 0 <= replicas < // kc.Wants_replicas. func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) { // Buffer for reads from 'r' var bufsize int if dataBytes > 0 { if dataBytes > BLOCKSIZE { - return "", 0, OversizeBlockError + return "", 0, ErrOversizeBlock } bufsize = int(dataBytes) } else { diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index f0da600c24..fcae4131fc 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -2,11 +2,8 @@ package keepclient import ( "crypto/md5" + "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/arvadostest" - "git.curoverse.com/arvados.git/sdk/go/streamer" - . "gopkg.in/check.v1" "io" "io/ioutil" "log" @@ -16,6 +13,11 @@ import ( "strings" "testing" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/arvadostest" + "git.curoverse.com/arvados.git/sdk/go/streamer" + . "gopkg.in/check.v1" ) // Gocheck boilerplate @@ -435,7 +437,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) { _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, Equals, InsufficientReplicasError) + c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) c.Check(replicas, Equals, 1) c.Check(<-st.handled, Equals, ks1[0].url) } @@ -921,7 +923,7 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) { _, replicas, err := kc.PutB([]byte("foo")) <-st.handled - c.Check(err, Equals, InsufficientReplicasError) + c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) c.Check(replicas, Equals, 2) } @@ -996,7 +998,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, Equals, InsufficientReplicasError) + c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) c.Check(replicas, Equals, 1) c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)]) @@ -1031,7 +1033,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) { _, replicas, err := kc.PutB([]byte("foo")) - c.Check(err, Equals, InsufficientReplicasError) + c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New(""))) c.Check(replicas, Equals, 0) } @@ -1263,5 +1265,5 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) { c.Assert(kc.replicasPerService, Equals, 0) c.Assert(kc.foundNonDiskSvc, Equals, true) - c.Assert(kc.Client.Timeout, Equals, 300*time.Second) + c.Assert(kc.Client.(*http.Client).Timeout, Equals, 300*time.Second) } diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go index 9adbb4878f..33ba8720bc 100644 --- a/sdk/go/keepclient/support.go +++ b/sdk/go/keepclient/support.go @@ -4,7 +4,6 @@ import ( "crypto/md5" "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/streamer" "io" "io/ioutil" "log" @@ -15,6 +14,8 @@ import ( "regexp" "strings" "time" + + "git.curoverse.com/arvados.git/sdk/go/streamer" ) // Function used to emit debug messages. The easiest way to enable @@ -45,49 +46,45 @@ func Md5String(s string) string { // Set timeouts applicable when connecting to non-disk services // (assumed to be over the Internet). -func (this *KeepClient) setClientSettingsNonDisk() { - if this.Client.Timeout == 0 { - // Maximum time to wait for a complete response - this.Client.Timeout = 300 * time.Second - - // TCP and TLS connection settings - this.Client.Transport = &http.Transport{ - Dial: (&net.Dialer{ - // The maximum time to wait to set up - // the initial TCP connection. - Timeout: 30 * time.Second, - - // The TCP keep alive heartbeat - // interval. - KeepAlive: 120 * time.Second, - }).Dial, - - TLSHandshakeTimeout: 10 * time.Second, - } +func (*KeepClient) setClientSettingsNonDisk(client *http.Client) { + // Maximum time to wait for a complete response + client.Timeout = 300 * time.Second + + // TCP and TLS connection settings + client.Transport = &http.Transport{ + Dial: (&net.Dialer{ + // The maximum time to wait to set up + // the initial TCP connection. + Timeout: 30 * time.Second, + + // The TCP keep alive heartbeat + // interval. + KeepAlive: 120 * time.Second, + }).Dial, + + TLSHandshakeTimeout: 10 * time.Second, } } // Set timeouts applicable when connecting to keepstore services directly // (assumed to be on the local network). -func (this *KeepClient) setClientSettingsDisk() { - if this.Client.Timeout == 0 { - // Maximum time to wait for a complete response - this.Client.Timeout = 20 * time.Second - - // TCP and TLS connection timeouts - this.Client.Transport = &http.Transport{ - Dial: (&net.Dialer{ - // The maximum time to wait to set up - // the initial TCP connection. - Timeout: 2 * time.Second, - - // The TCP keep alive heartbeat - // interval. - KeepAlive: 180 * time.Second, - }).Dial, - - TLSHandshakeTimeout: 4 * time.Second, - } +func (*KeepClient) setClientSettingsDisk(client *http.Client) { + // Maximum time to wait for a complete response + client.Timeout = 20 * time.Second + + // TCP and TLS connection timeouts + client.Transport = &http.Transport{ + Dial: (&net.Dialer{ + // The maximum time to wait to set up + // the initial TCP connection. + Timeout: 2 * time.Second, + + // The TCP keep alive heartbeat + // interval. + KeepAlive: 180 * time.Second, + }).Dial, + + TLSHandshakeTimeout: 4 * time.Second, } } @@ -157,6 +154,9 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea DebugPrintf("DEBUG: [%08x] Upload %v success", requestID, url) upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response} } else { + if resp.StatusCode >= 300 && response == "" { + response = resp.Status + } DebugPrintf("DEBUG: [%08x] Upload %v error: %v response: %v", requestID, url, resp.StatusCode, response) upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response} } @@ -206,6 +206,8 @@ func (this *KeepClient) putReplicas( retriesRemaining := 1 + this.Retries var retryServers []string + lastError := make(map[string]string) + for retriesRemaining > 0 { retriesRemaining -= 1 next_server = 0 @@ -220,7 +222,12 @@ func (this *KeepClient) putReplicas( active += 1 } else { if active == 0 && retriesRemaining == 0 { - return locator, replicasDone, InsufficientReplicasError + msg := "Could not write sufficient replicas: " + for _, resp := range lastError { + msg += resp + "; " + } + msg = msg[:len(msg)-2] + return locator, replicasDone, InsufficientReplicasError(errors.New(msg)) } else { break } @@ -239,7 +246,16 @@ func (this *KeepClient) putReplicas( replicasDone += status.replicas_stored replicasTodo -= status.replicas_stored locator = status.response - } else if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 || + delete(lastError, status.url) + } else { + msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response) + if len(msg) > 100 { + msg = msg[:100] + } + lastError[status.url] = msg + } + + if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 || (status.statusCode >= 500 && status.statusCode != 503) { // Timeout, too many requests, or other server side failure // Do not retry when status code is 503, which means the keep server is full diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go index 620ed9cfb4..5e3e4afdb4 100644 --- a/services/keep-web/handler.go +++ b/services/keep-web/handler.go @@ -335,9 +335,9 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) { statusCode, statusText = http.StatusInternalServerError, err.Error() return } - if kc.Client != nil && kc.Client.Transport != nil { + if client, ok := kc.Client.(*http.Client); ok && client.Transport != nil { // Workaround for https://dev.arvados.org/issues/9005 - if t, ok := kc.Client.Transport.(*http.Transport); ok { + if t, ok := client.Transport.(*http.Transport); ok { defer t.CloseIdleConnections() } } diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go index 76a8a1551f..7a673aeba9 100644 --- a/services/keepproxy/keepproxy.go +++ b/services/keepproxy/keepproxy.go @@ -12,6 +12,7 @@ import ( "os" "os/signal" "regexp" + "strings" "sync" "syscall" "time" @@ -43,7 +44,10 @@ func DefaultConfig() *Config { } } -var listener net.Listener +var ( + listener net.Listener + router http.Handler +) func main() { cfg := DefaultConfig() @@ -129,7 +133,7 @@ func main() { if cfg.DefaultReplicas > 0 { kc.Want_replicas = cfg.DefaultReplicas } - kc.Client.Timeout = time.Duration(cfg.Timeout) + kc.Client.(*http.Client).Timeout = time.Duration(cfg.Timeout) go kc.RefreshServices(5*time.Minute, 3*time.Second) listener, err = net.Listen("tcp", cfg.Listen) @@ -153,7 +157,8 @@ func main() { signal.Notify(term, syscall.SIGINT) // Start serving requests. - http.Serve(listener, MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc)) + router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc) + http.Serve(listener, router) log.Println("shutting down") } @@ -232,61 +237,57 @@ func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, r return true, tok } -type GetBlockHandler struct { +type proxyHandler struct { + http.Handler *keepclient.KeepClient *ApiTokenCache } -type PutBlockHandler struct { - *keepclient.KeepClient - *ApiTokenCache -} - -type IndexHandler struct { - *keepclient.KeepClient - *ApiTokenCache -} - -type InvalidPathHandler struct{} - -type OptionsHandler struct{} - -// MakeRESTRouter -// Returns a mux.Router that passes GET and PUT requests to the -// appropriate handlers. -// -func MakeRESTRouter( - enable_get bool, - enable_put bool, - kc *keepclient.KeepClient) *mux.Router { - - t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300} - +// MakeRESTRouter returns an http.Handler that passes GET and PUT +// requests to the appropriate handlers. +func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient) http.Handler { rest := mux.NewRouter() + h := &proxyHandler{ + Handler: rest, + KeepClient: kc, + ApiTokenCache: &ApiTokenCache{ + tokens: make(map[string]int64), + expireTime: 300, + }, + } if enable_get { - rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, - GetBlockHandler{kc, t}).Methods("GET", "HEAD") - rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD") + rest.HandleFunc(`/{locator:[0-9a-f]{32}\+.*}`, h.Get).Methods("GET", "HEAD") + rest.HandleFunc(`/{locator:[0-9a-f]{32}}`, h.Get).Methods("GET", "HEAD") // List all blocks - rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET") + rest.HandleFunc(`/index`, h.Index).Methods("GET") // List blocks whose hash has the given prefix - rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET") + rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, h.Index).Methods("GET") } if enable_put { - rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, PutBlockHandler{kc, t}).Methods("PUT") - rest.Handle(`/{locator:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT") - rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST") - rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS") - rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS") + rest.HandleFunc(`/{locator:[0-9a-f]{32}\+.*}`, h.Put).Methods("PUT") + rest.HandleFunc(`/{locator:[0-9a-f]{32}}`, h.Put).Methods("PUT") + rest.HandleFunc(`/`, h.Put).Methods("POST") + rest.HandleFunc(`/{any}`, h.Options).Methods("OPTIONS") + rest.HandleFunc(`/`, h.Options).Methods("OPTIONS") } rest.NotFoundHandler = InvalidPathHandler{} + return h +} - return rest +var errLoopDetected = errors.New("loop detected") + +func (*proxyHandler) checkLoop(resp http.ResponseWriter, req *http.Request) error { + if via := req.Header.Get("Via"); strings.Index(via, " "+viaAlias) >= 0 { + log.Printf("proxy loop detected (request has Via: %q): perhaps keepproxy is misidentified by gateway config as an external client, or its keep_services record does not have service_type=proxy?", via) + http.Error(resp, errLoopDetected.Error(), http.StatusInternalServerError) + return errLoopDetected + } + return nil } func SetCorsHeaders(resp http.ResponseWriter) { @@ -296,12 +297,14 @@ func SetCorsHeaders(resp http.ResponseWriter) { resp.Header().Set("Access-Control-Max-Age", "86486400") } -func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +type InvalidPathHandler struct{} + +func (InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path) http.Error(resp, "Bad request", http.StatusBadRequest) } -func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (h *proxyHandler) Options(resp http.ResponseWriter, req *http.Request) { log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path) SetCorsHeaders(resp) } @@ -312,7 +315,10 @@ var MethodNotSupported = errors.New("Method not supported") var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)") -func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (h *proxyHandler) Get(resp http.ResponseWriter, req *http.Request) { + if err := h.checkLoop(resp, req); err != nil { + return + } SetCorsHeaders(resp) locator := mux.Vars(req)["locator"] @@ -328,11 +334,12 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques } }() - kc := *this.KeepClient + kc := *h.KeepClient + kc.Client = &proxyClient{client: kc.Client, proto: req.Proto} var pass bool var tok string - if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass { + if pass, tok = CheckAuthorizationHeader(&kc, h.ApiTokenCache, req); !pass { status, err = http.StatusForbidden, BadAuthorizationHeader return } @@ -392,10 +399,15 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques var LengthRequiredError = errors.New(http.StatusText(http.StatusLengthRequired)) var LengthMismatchError = errors.New("Locator size hint does not match Content-Length header") -func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) { + if err := h.checkLoop(resp, req); err != nil { + return + } SetCorsHeaders(resp) - kc := *this.KeepClient + kc := *h.KeepClient + kc.Client = &proxyClient{client: kc.Client, proto: req.Proto} + var err error var expectLength int64 var status = http.StatusInternalServerError @@ -432,7 +444,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques var pass bool var tok string - if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass { + if pass, tok = CheckAuthorizationHeader(&kc, h.ApiTokenCache, req); !pass { err = BadAuthorizationHeader status = http.StatusForbidden return @@ -468,7 +480,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques // Tell the client how many successful PUTs we accomplished resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", wroteReplicas)) - switch err { + switch err.(type) { case nil: status = http.StatusOK _, err = io.WriteString(resp, locatorOut) @@ -500,7 +512,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques // Expects "complete" response (terminating with blank new line) // Aborts on any errors // Concatenates responses from all those keep servers and returns -func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { +func (h *proxyHandler) Index(resp http.ResponseWriter, req *http.Request) { SetCorsHeaders(resp) prefix := mux.Vars(req)["prefix"] @@ -513,9 +525,9 @@ func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques } }() - kc := *handler.KeepClient + kc := *h.KeepClient - ok, token := CheckAuthorizationHeader(&kc, handler.ApiTokenCache, req) + ok, token := CheckAuthorizationHeader(&kc, h.ApiTokenCache, req) if !ok { status, err = http.StatusForbidden, BadAuthorizationHeader return diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go index 6a349dae21..e4ba22a5c4 100644 --- a/services/keepproxy/keepproxy_test.go +++ b/services/keepproxy/keepproxy_test.go @@ -3,10 +3,8 @@ package main import ( "bytes" "crypto/md5" + "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/arvadostest" - "git.curoverse.com/arvados.git/sdk/go/keepclient" "io/ioutil" "log" "net/http" @@ -16,6 +14,10 @@ import ( "testing" "time" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/arvadostest" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + . "gopkg.in/check.v1" ) @@ -111,6 +113,24 @@ func runProxy(c *C, args []string, bogusClientToken bool) *keepclient.KeepClient return kc } +func (s *ServerRequiredSuite) TestLoopDetection(c *C) { + kc := runProxy(c, nil, false) + defer closeListener() + + sr := map[string]string{ + TestProxyUUID: "http://" + listener.Addr().String(), + } + router.(*proxyHandler).KeepClient.SetServiceRoots(sr, sr, sr) + + content := []byte("TestLoopDetection") + _, _, err := kc.PutB(content) + c.Check(err, ErrorMatches, `.*loop detected.*`) + + hash := fmt.Sprintf("%x", md5.Sum(content)) + _, _, _, err = kc.Get(hash) + c.Check(err, ErrorMatches, `.*loop detected.*`) +} + func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) { kc := runProxy(c, nil, false) defer closeListener() @@ -260,7 +280,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) { hash2, rep, err := kc.PutB([]byte("bar")) c.Check(hash2, Equals, "") c.Check(rep, Equals, 0) - c.Check(err, Equals, keepclient.InsufficientReplicasError) + c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New(""))) log.Print("PutB") } @@ -331,7 +351,7 @@ func (s *ServerRequiredSuite) TestPutDisabled(c *C) { hash2, rep, err := kc.PutB([]byte("quux")) c.Check(hash2, Equals, "") c.Check(rep, Equals, 0) - c.Check(err, Equals, keepclient.InsufficientReplicasError) + c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New(""))) } func (s *ServerRequiredSuite) TestCorsHeaders(c *C) { diff --git a/services/keepproxy/proxy_client.go b/services/keepproxy/proxy_client.go new file mode 100644 index 0000000000..2b25de247e --- /dev/null +++ b/services/keepproxy/proxy_client.go @@ -0,0 +1,19 @@ +package main + +import ( + "net/http" + + "git.curoverse.com/arvados.git/sdk/go/keepclient" +) + +var viaAlias = "keepproxy" + +type proxyClient struct { + client keepclient.HTTPClient + proto string +} + +func (pc *proxyClient) Do(req *http.Request) (*http.Response, error) { + req.Header.Add("Via", pc.proto+" "+viaAlias) + return pc.client.Do(req) +} diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go index 6d791bf987..706664ce28 100644 --- a/tools/keep-exercise/keep-exercise.go +++ b/tools/keep-exercise/keep-exercise.go @@ -21,6 +21,7 @@ import ( "io" "io/ioutil" "log" + "net/http" "time" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" @@ -52,7 +53,7 @@ func main() { log.Fatal(err) } kc.Want_replicas = *Replicas - kc.Client.Timeout = 10 * time.Minute + kc.Client.(*http.Client).Timeout = 10 * time.Minute overrideServices(kc)