X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fde27ce0e46521db9828c228e7fb531e003724a8..1dfede4c84c961230954a4dad9ab348029dfa128:/services/keepstore/proxy_remote.go diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go index 9f4a8ef1a1..1f82f3f4fc 100644 --- a/services/keepstore/proxy_remote.go +++ b/services/keepstore/proxy_remote.go @@ -36,7 +36,7 @@ func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.R http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized) return } - if sign := r.Header.Get("X-Keep-Signature"); sign != "" { + if strings.SplitN(r.Header.Get("X-Keep-Signature"), ",", 2)[0] == "local" { buf, err := getBufferWithContext(ctx, bufs, BlockSize) if err != nil { http.Error(w, err.Error(), http.StatusServiceUnavailable) @@ -48,8 +48,9 @@ func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.R Token: token, Buffer: buf[:0], ResponseWriter: w, + Context: ctx, } - defer rrc.Flush(ctx) + defer rrc.Close() w = rrc } var remoteClient *keepclient.KeepClient @@ -65,7 +66,7 @@ func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.R remoteID := part[1:6] remote, ok := cluster.RemoteClusters[remoteID] if !ok { - http.Error(w, "remote cluster not configured", http.StatusBadGateway) + http.Error(w, "remote cluster not configured", http.StatusBadRequest) return } kc, err := rp.remoteClient(remoteID, remote, token) @@ -147,6 +148,7 @@ type remoteResponseCacher struct { Locator string Token string Buffer []byte + Context context.Context http.ResponseWriter statusCode int } @@ -163,35 +165,42 @@ func (rrc *remoteResponseCacher) WriteHeader(statusCode int) { rrc.statusCode = statusCode } -func (rrc *remoteResponseCacher) Flush(ctx context.Context) { +func (rrc *remoteResponseCacher) Close() error { if rrc.statusCode == 0 { rrc.statusCode = http.StatusOK } else if rrc.statusCode != http.StatusOK { rrc.ResponseWriter.WriteHeader(rrc.statusCode) rrc.ResponseWriter.Write(rrc.Buffer) - return + return nil + } + _, err := PutBlock(rrc.Context, rrc.Buffer, rrc.Locator[:32]) + if rrc.Context.Err() != nil { + // If caller hung up, log that instead of subsequent/misleading errors. + http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout) + return err } - _, err := PutBlock(ctx, rrc.Buffer, rrc.Locator[:32]) if err == RequestHashError { http.Error(rrc.ResponseWriter, "checksum mismatch in remote response", http.StatusBadGateway) - return + return err } if err, ok := err.(*KeepError); ok { http.Error(rrc.ResponseWriter, err.Error(), err.HTTPCode) - return + return err } if err != nil { http.Error(rrc.ResponseWriter, err.Error(), http.StatusBadGateway) - return + return err } unsigned := localOrRemoteSignature.ReplaceAllLiteralString(rrc.Locator, "") signed := SignLocator(unsigned, rrc.Token, time.Now().Add(theConfig.BlobSignatureTTL.Duration())) if signed == unsigned { - http.Error(rrc.ResponseWriter, "could not sign locator", http.StatusInternalServerError) - return + err = errors.New("could not sign locator") + http.Error(rrc.ResponseWriter, err.Error(), http.StatusInternalServerError) + return err } rrc.Header().Set("X-Keep-Locator", signed) rrc.ResponseWriter.WriteHeader(rrc.statusCode) - rrc.ResponseWriter.Write(rrc.Buffer) + _, err = rrc.ResponseWriter.Write(rrc.Buffer) + return err }