X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/04b9d6e93ca8bd18dca697e56689820516c8c572..8a27fe370239ecb8e50d53f46b45ed61203a35ca:/services/keepstore/proxy_remote.go diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go index 2e3d663519..526bc25299 100644 --- a/services/keepstore/proxy_remote.go +++ b/services/keepstore/proxy_remote.go @@ -2,18 +2,22 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( + "context" + "errors" "io" "net/http" + "regexp" "strings" "sync" + "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/auth" - "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/auth" + "git.arvados.org/arvados.git/sdk/go/keepclient" ) type remoteProxy struct { @@ -21,7 +25,36 @@ type remoteProxy struct { mtx sync.Mutex } -func (rp *remoteProxy) Get(w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster) { +func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster, volmgr *RRVolumeManager) { + // Intervening proxies must not return a cached GET response + // to a prior request if a X-Keep-Signature request header has + // been added or changed. + w.Header().Add("Vary", "X-Keep-Signature") + + token := GetAPIToken(r) + if token == "" { + http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized) + return + } + if strings.SplitN(r.Header.Get("X-Keep-Signature"), ",", 2)[0] == "local" { + buf, err := getBufferWithContext(ctx, bufs, BlockSize) + if err != nil { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + defer bufs.Put(buf) + rrc := &remoteResponseCacher{ + Locator: r.URL.Path[1:], + Token: token, + Buffer: buf[:0], + ResponseWriter: w, + Context: ctx, + Cluster: cluster, + VolumeManager: volmgr, + } + defer rrc.Close() + w = rrc + } var remoteClient *keepclient.KeepClient var parts []string for i, part := range strings.Split(r.URL.Path[1:], "+") { @@ -35,12 +68,7 @@ func (rp *remoteProxy) Get(w http.ResponseWriter, r *http.Request, cluster *arva remoteID := part[1:6] remote, ok := cluster.RemoteClusters[remoteID] if !ok { - http.Error(w, "remote cluster not configured", http.StatusBadGateway) - return - } - token := GetAPIToken(r) - if token == "" { - http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized) + http.Error(w, "remote cluster not configured", http.StatusBadRequest) return } kc, err := rp.remoteClient(remoteID, remote, token) @@ -111,3 +139,73 @@ func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.Remot kccopy.Arvados.ApiToken = token return &kccopy, nil } + +var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`) + +// remoteResponseCacher wraps http.ResponseWriter. It buffers the +// response data in the provided buffer, writes/touches a copy on a +// local volume, adds a response header with a locally-signed locator, +// and finally writes the data through. +type remoteResponseCacher struct { + Locator string + Token string + Buffer []byte + Context context.Context + Cluster *arvados.Cluster + VolumeManager *RRVolumeManager + http.ResponseWriter + statusCode int +} + +func (rrc *remoteResponseCacher) Write(p []byte) (int, error) { + if len(rrc.Buffer)+len(p) > cap(rrc.Buffer) { + return 0, errors.New("buffer full") + } + rrc.Buffer = append(rrc.Buffer, p...) + return len(p), nil +} + +func (rrc *remoteResponseCacher) WriteHeader(statusCode int) { + rrc.statusCode = statusCode +} + +func (rrc *remoteResponseCacher) Close() error { + if rrc.statusCode == 0 { + rrc.statusCode = http.StatusOK + } else if rrc.statusCode != http.StatusOK { + rrc.ResponseWriter.WriteHeader(rrc.statusCode) + rrc.ResponseWriter.Write(rrc.Buffer) + return nil + } + _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32], nil) + if rrc.Context.Err() != nil { + // If caller hung up, log that instead of subsequent/misleading errors. + http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout) + return err + } + if err == RequestHashError { + http.Error(rrc.ResponseWriter, "checksum mismatch in remote response", http.StatusBadGateway) + return err + } + if err, ok := err.(*KeepError); ok { + http.Error(rrc.ResponseWriter, err.Error(), err.HTTPCode) + return err + } + if err != nil { + http.Error(rrc.ResponseWriter, err.Error(), http.StatusBadGateway) + return err + } + + unsigned := localOrRemoteSignature.ReplaceAllLiteralString(rrc.Locator, "") + expiry := time.Now().Add(rrc.Cluster.Collections.BlobSigningTTL.Duration()) + signed := SignLocator(rrc.Cluster, unsigned, rrc.Token, expiry) + if signed == unsigned { + err = errors.New("could not sign locator") + http.Error(rrc.ResponseWriter, err.Error(), http.StatusInternalServerError) + return err + } + rrc.Header().Set("X-Keep-Locator", signed) + rrc.ResponseWriter.WriteHeader(rrc.statusCode) + _, err = rrc.ResponseWriter.Write(rrc.Buffer) + return err +}