Refactor the multi-host salt install page.
[arvados.git] / services / keepstore / proxy_remote.go
index 2e3d6635192622a02d4ee8f3a39d6713cd9cfbef..526bc25299373ba56d5a7d27f445551c940edcb7 100644 (file)
@@ -2,18 +2,22 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
+       "context"
+       "errors"
        "io"
        "net/http"
+       "regexp"
        "strings"
        "sync"
+       "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/auth"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
 )
 
 type remoteProxy struct {
@@ -21,7 +25,36 @@ type remoteProxy struct {
        mtx     sync.Mutex
 }
 
-func (rp *remoteProxy) Get(w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster) {
+func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster, volmgr *RRVolumeManager) {
+       // Intervening proxies must not return a cached GET response
+       // to a prior request if a X-Keep-Signature request header has
+       // been added or changed.
+       w.Header().Add("Vary", "X-Keep-Signature")
+
+       token := GetAPIToken(r)
+       if token == "" {
+               http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
+               return
+       }
+       if strings.SplitN(r.Header.Get("X-Keep-Signature"), ",", 2)[0] == "local" {
+               buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+               if err != nil {
+                       http.Error(w, err.Error(), http.StatusServiceUnavailable)
+                       return
+               }
+               defer bufs.Put(buf)
+               rrc := &remoteResponseCacher{
+                       Locator:        r.URL.Path[1:],
+                       Token:          token,
+                       Buffer:         buf[:0],
+                       ResponseWriter: w,
+                       Context:        ctx,
+                       Cluster:        cluster,
+                       VolumeManager:  volmgr,
+               }
+               defer rrc.Close()
+               w = rrc
+       }
        var remoteClient *keepclient.KeepClient
        var parts []string
        for i, part := range strings.Split(r.URL.Path[1:], "+") {
@@ -35,12 +68,7 @@ func (rp *remoteProxy) Get(w http.ResponseWriter, r *http.Request, cluster *arva
                        remoteID := part[1:6]
                        remote, ok := cluster.RemoteClusters[remoteID]
                        if !ok {
-                               http.Error(w, "remote cluster not configured", http.StatusBadGateway)
-                               return
-                       }
-                       token := GetAPIToken(r)
-                       if token == "" {
-                               http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
+                               http.Error(w, "remote cluster not configured", http.StatusBadRequest)
                                return
                        }
                        kc, err := rp.remoteClient(remoteID, remote, token)
@@ -111,3 +139,73 @@ func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.Remot
        kccopy.Arvados.ApiToken = token
        return &kccopy, nil
 }
+
+var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`)
+
+// remoteResponseCacher wraps http.ResponseWriter. It buffers the
+// response data in the provided buffer, writes/touches a copy on a
+// local volume, adds a response header with a locally-signed locator,
+// and finally writes the data through.
+type remoteResponseCacher struct {
+       Locator       string
+       Token         string
+       Buffer        []byte
+       Context       context.Context
+       Cluster       *arvados.Cluster
+       VolumeManager *RRVolumeManager
+       http.ResponseWriter
+       statusCode int
+}
+
+func (rrc *remoteResponseCacher) Write(p []byte) (int, error) {
+       if len(rrc.Buffer)+len(p) > cap(rrc.Buffer) {
+               return 0, errors.New("buffer full")
+       }
+       rrc.Buffer = append(rrc.Buffer, p...)
+       return len(p), nil
+}
+
+func (rrc *remoteResponseCacher) WriteHeader(statusCode int) {
+       rrc.statusCode = statusCode
+}
+
+func (rrc *remoteResponseCacher) Close() error {
+       if rrc.statusCode == 0 {
+               rrc.statusCode = http.StatusOK
+       } else if rrc.statusCode != http.StatusOK {
+               rrc.ResponseWriter.WriteHeader(rrc.statusCode)
+               rrc.ResponseWriter.Write(rrc.Buffer)
+               return nil
+       }
+       _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32], nil)
+       if rrc.Context.Err() != nil {
+               // If caller hung up, log that instead of subsequent/misleading errors.
+               http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout)
+               return err
+       }
+       if err == RequestHashError {
+               http.Error(rrc.ResponseWriter, "checksum mismatch in remote response", http.StatusBadGateway)
+               return err
+       }
+       if err, ok := err.(*KeepError); ok {
+               http.Error(rrc.ResponseWriter, err.Error(), err.HTTPCode)
+               return err
+       }
+       if err != nil {
+               http.Error(rrc.ResponseWriter, err.Error(), http.StatusBadGateway)
+               return err
+       }
+
+       unsigned := localOrRemoteSignature.ReplaceAllLiteralString(rrc.Locator, "")
+       expiry := time.Now().Add(rrc.Cluster.Collections.BlobSigningTTL.Duration())
+       signed := SignLocator(rrc.Cluster, unsigned, rrc.Token, expiry)
+       if signed == unsigned {
+               err = errors.New("could not sign locator")
+               http.Error(rrc.ResponseWriter, err.Error(), http.StatusInternalServerError)
+               return err
+       }
+       rrc.Header().Set("X-Keep-Locator", signed)
+       rrc.ResponseWriter.WriteHeader(rrc.statusCode)
+       _, err = rrc.ResponseWriter.Write(rrc.Buffer)
+       return err
+}