package main
import (
+ "context"
+ "errors"
"io"
"net/http"
+ "regexp"
"strings"
"sync"
+ "time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/auth"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
)
type remoteProxy struct {
mtx sync.Mutex
}
-func (rp *remoteProxy) Get(w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster) {
+func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster, volmgr *RRVolumeManager) {
+ // Intervening proxies must not return a cached GET response
+ // to a prior request if a X-Keep-Signature request header has
+ // been added or changed.
+ w.Header().Add("Vary", "X-Keep-Signature")
+
+ token := GetAPIToken(r)
+ if token == "" {
+ http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
+ return
+ }
+ if strings.SplitN(r.Header.Get("X-Keep-Signature"), ",", 2)[0] == "local" {
+ buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusServiceUnavailable)
+ return
+ }
+ defer bufs.Put(buf)
+ rrc := &remoteResponseCacher{
+ Locator: r.URL.Path[1:],
+ Token: token,
+ Buffer: buf[:0],
+ ResponseWriter: w,
+ Context: ctx,
+ Cluster: cluster,
+ VolumeManager: volmgr,
+ }
+ defer rrc.Close()
+ w = rrc
+ }
var remoteClient *keepclient.KeepClient
var parts []string
for i, part := range strings.Split(r.URL.Path[1:], "+") {
remoteID := part[1:6]
remote, ok := cluster.RemoteClusters[remoteID]
if !ok {
- http.Error(w, "remote cluster not configured", http.StatusBadGateway)
- return
- }
- token := GetAPIToken(r)
- if token == "" {
- http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
+ http.Error(w, "remote cluster not configured", http.StatusBadRequest)
return
}
kc, err := rp.remoteClient(remoteID, remote, token)
kccopy.Arvados.ApiToken = token
return &kccopy, nil
}
+
+var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`)
+
+// remoteResponseCacher wraps http.ResponseWriter. It buffers the
+// response data in the provided buffer, writes/touches a copy on a
+// local volume, adds a response header with a locally-signed locator,
+// and finally writes the data through.
+type remoteResponseCacher struct {
+ Locator string
+ Token string
+ Buffer []byte
+ Context context.Context
+ Cluster *arvados.Cluster
+ VolumeManager *RRVolumeManager
+ http.ResponseWriter
+ statusCode int
+}
+
+func (rrc *remoteResponseCacher) Write(p []byte) (int, error) {
+ if len(rrc.Buffer)+len(p) > cap(rrc.Buffer) {
+ return 0, errors.New("buffer full")
+ }
+ rrc.Buffer = append(rrc.Buffer, p...)
+ return len(p), nil
+}
+
+func (rrc *remoteResponseCacher) WriteHeader(statusCode int) {
+ rrc.statusCode = statusCode
+}
+
+func (rrc *remoteResponseCacher) Close() error {
+ if rrc.statusCode == 0 {
+ rrc.statusCode = http.StatusOK
+ } else if rrc.statusCode != http.StatusOK {
+ rrc.ResponseWriter.WriteHeader(rrc.statusCode)
+ rrc.ResponseWriter.Write(rrc.Buffer)
+ return nil
+ }
+ _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32], nil)
+ if rrc.Context.Err() != nil {
+ // If caller hung up, log that instead of subsequent/misleading errors.
+ http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout)
+ return err
+ }
+ if err == RequestHashError {
+ http.Error(rrc.ResponseWriter, "checksum mismatch in remote response", http.StatusBadGateway)
+ return err
+ }
+ if err, ok := err.(*KeepError); ok {
+ http.Error(rrc.ResponseWriter, err.Error(), err.HTTPCode)
+ return err
+ }
+ if err != nil {
+ http.Error(rrc.ResponseWriter, err.Error(), http.StatusBadGateway)
+ return err
+ }
+
+ unsigned := localOrRemoteSignature.ReplaceAllLiteralString(rrc.Locator, "")
+ expiry := time.Now().Add(rrc.Cluster.Collections.BlobSigningTTL.Duration())
+ signed := SignLocator(rrc.Cluster, unsigned, rrc.Token, expiry)
+ if signed == unsigned {
+ err = errors.New("could not sign locator")
+ http.Error(rrc.ResponseWriter, err.Error(), http.StatusInternalServerError)
+ return err
+ }
+ rrc.Header().Set("X-Keep-Locator", signed)
+ rrc.ResponseWriter.WriteHeader(rrc.statusCode)
+ _, err = rrc.ResponseWriter.Write(rrc.Buffer)
+ return err
+}