X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1e27ecf1368de5932c4af00c6cff9595c501f5f6..8a27fe370239ecb8e50d53f46b45ed61203a35ca:/services/keepstore/proxy_remote.go diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go index aaa1a0188e..526bc25299 100644 --- a/services/keepstore/proxy_remote.go +++ b/services/keepstore/proxy_remote.go @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( "context" @@ -14,10 +14,10 @@ import ( "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/auth" - "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/auth" + "git.arvados.org/arvados.git/sdk/go/keepclient" ) type remoteProxy struct { @@ -25,13 +25,18 @@ type remoteProxy struct { mtx sync.Mutex } -func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster) { +func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster, volmgr *RRVolumeManager) { + // Intervening proxies must not return a cached GET response + // to a prior request if a X-Keep-Signature request header has + // been added or changed. + w.Header().Add("Vary", "X-Keep-Signature") + token := GetAPIToken(r) if token == "" { http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized) return } - if sign := r.Header.Get("X-Keep-Signature"); sign != "" { + if strings.SplitN(r.Header.Get("X-Keep-Signature"), ",", 2)[0] == "local" { buf, err := getBufferWithContext(ctx, bufs, BlockSize) if err != nil { http.Error(w, err.Error(), http.StatusServiceUnavailable) @@ -43,8 +48,11 @@ func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.R Token: token, Buffer: buf[:0], ResponseWriter: w, + Context: ctx, + Cluster: cluster, + VolumeManager: volmgr, } - defer rrc.Flush(ctx) + defer rrc.Close() w = rrc } var remoteClient *keepclient.KeepClient @@ -60,7 +68,7 @@ func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.R remoteID := part[1:6] remote, ok := cluster.RemoteClusters[remoteID] if !ok { - http.Error(w, "remote cluster not configured", http.StatusBadGateway) + http.Error(w, "remote cluster not configured", http.StatusBadRequest) return } kc, err := rp.remoteClient(remoteID, remote, token) @@ -139,9 +147,12 @@ var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`) // local volume, adds a response header with a locally-signed locator, // and finally writes the data through. type remoteResponseCacher struct { - Locator string - Token string - Buffer []byte + Locator string + Token string + Buffer []byte + Context context.Context + Cluster *arvados.Cluster + VolumeManager *RRVolumeManager http.ResponseWriter statusCode int } @@ -158,35 +169,43 @@ func (rrc *remoteResponseCacher) WriteHeader(statusCode int) { rrc.statusCode = statusCode } -func (rrc *remoteResponseCacher) Flush(ctx context.Context) { +func (rrc *remoteResponseCacher) Close() error { if rrc.statusCode == 0 { rrc.statusCode = http.StatusOK } else if rrc.statusCode != http.StatusOK { rrc.ResponseWriter.WriteHeader(rrc.statusCode) rrc.ResponseWriter.Write(rrc.Buffer) - return + return nil + } + _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32], nil) + if rrc.Context.Err() != nil { + // If caller hung up, log that instead of subsequent/misleading errors. + http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout) + return err } - _, err := PutBlock(ctx, rrc.Buffer, rrc.Locator[:32]) if err == RequestHashError { http.Error(rrc.ResponseWriter, "checksum mismatch in remote response", http.StatusBadGateway) - return + return err } if err, ok := err.(*KeepError); ok { http.Error(rrc.ResponseWriter, err.Error(), err.HTTPCode) - return + return err } if err != nil { http.Error(rrc.ResponseWriter, err.Error(), http.StatusBadGateway) - return + return err } unsigned := localOrRemoteSignature.ReplaceAllLiteralString(rrc.Locator, "") - signed := SignLocator(unsigned, rrc.Token, time.Now().Add(theConfig.BlobSignatureTTL.Duration())) + expiry := time.Now().Add(rrc.Cluster.Collections.BlobSigningTTL.Duration()) + signed := SignLocator(rrc.Cluster, unsigned, rrc.Token, expiry) if signed == unsigned { - http.Error(rrc.ResponseWriter, "could not sign locator", http.StatusInternalServerError) - return + err = errors.New("could not sign locator") + http.Error(rrc.ResponseWriter, err.Error(), http.StatusInternalServerError) + return err } rrc.Header().Set("X-Keep-Locator", signed) rrc.ResponseWriter.WriteHeader(rrc.statusCode) - rrc.ResponseWriter.Write(rrc.Buffer) + _, err = rrc.ResponseWriter.Write(rrc.Buffer) + return err }