2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / proxy_remote.go
diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go
deleted file mode 100644 (file)
index 325f1cf..0000000
+++ /dev/null
@@ -1,212 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package keepstore
-
-import (
-       "context"
-       "errors"
-       "io"
-       "net/http"
-       "regexp"
-       "strings"
-       "sync"
-       "time"
-
-       "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
-       "git.arvados.org/arvados.git/sdk/go/auth"
-       "git.arvados.org/arvados.git/sdk/go/keepclient"
-)
-
-type remoteProxy struct {
-       clients map[string]*keepclient.KeepClient
-       mtx     sync.Mutex
-}
-
-func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster, volmgr *RRVolumeManager) {
-       // Intervening proxies must not return a cached GET response
-       // to a prior request if a X-Keep-Signature request header has
-       // been added or changed.
-       w.Header().Add("Vary", "X-Keep-Signature")
-
-       token := GetAPIToken(r)
-       if token == "" {
-               http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
-               return
-       }
-       if strings.SplitN(r.Header.Get("X-Keep-Signature"), ",", 2)[0] == "local" {
-               buf, err := getBufferWithContext(ctx, bufs, BlockSize)
-               if err != nil {
-                       http.Error(w, err.Error(), http.StatusServiceUnavailable)
-                       return
-               }
-               defer bufs.Put(buf)
-               rrc := &remoteResponseCacher{
-                       Locator:        r.URL.Path[1:],
-                       Token:          token,
-                       Buffer:         buf[:0],
-                       ResponseWriter: w,
-                       Context:        ctx,
-                       Cluster:        cluster,
-                       VolumeManager:  volmgr,
-               }
-               defer rrc.Close()
-               w = rrc
-       }
-       var remoteClient *keepclient.KeepClient
-       var parts []string
-       for i, part := range strings.Split(r.URL.Path[1:], "+") {
-               switch {
-               case i == 0:
-                       // don't try to parse hash part as hint
-               case strings.HasPrefix(part, "A"):
-                       // drop local permission hint
-                       continue
-               case len(part) > 7 && part[0] == 'R' && part[6] == '-':
-                       remoteID := part[1:6]
-                       remote, ok := cluster.RemoteClusters[remoteID]
-                       if !ok {
-                               http.Error(w, "remote cluster not configured", http.StatusBadRequest)
-                               return
-                       }
-                       kc, err := rp.remoteClient(remoteID, remote, token)
-                       if err == auth.ErrObsoleteToken {
-                               http.Error(w, err.Error(), http.StatusBadRequest)
-                               return
-                       } else if err != nil {
-                               http.Error(w, err.Error(), http.StatusInternalServerError)
-                               return
-                       }
-                       remoteClient = kc
-                       part = "A" + part[7:]
-               }
-               parts = append(parts, part)
-       }
-       if remoteClient == nil {
-               http.Error(w, "bad request", http.StatusBadRequest)
-               return
-       }
-       locator := strings.Join(parts, "+")
-       rdr, _, _, err := remoteClient.Get(locator)
-       switch err.(type) {
-       case nil:
-               defer rdr.Close()
-               io.Copy(w, rdr)
-       case *keepclient.ErrNotFound:
-               http.Error(w, err.Error(), http.StatusNotFound)
-       default:
-               http.Error(w, err.Error(), http.StatusBadGateway)
-       }
-}
-
-func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
-       rp.mtx.Lock()
-       kc, ok := rp.clients[remoteID]
-       rp.mtx.Unlock()
-       if !ok {
-               c := &arvados.Client{
-                       APIHost:   remoteCluster.Host,
-                       AuthToken: "xxx",
-                       Insecure:  remoteCluster.Insecure,
-               }
-               ac, err := arvadosclient.New(c)
-               if err != nil {
-                       return nil, err
-               }
-               kc, err = keepclient.MakeKeepClient(ac)
-               if err != nil {
-                       return nil, err
-               }
-               kc.DiskCacheSize = keepclient.DiskCacheDisabled
-
-               rp.mtx.Lock()
-               if rp.clients == nil {
-                       rp.clients = map[string]*keepclient.KeepClient{remoteID: kc}
-               } else {
-                       rp.clients[remoteID] = kc
-               }
-               rp.mtx.Unlock()
-       }
-       accopy := *kc.Arvados
-       accopy.ApiToken = token
-       kccopy := kc.Clone()
-       kccopy.Arvados = &accopy
-       token, err := auth.SaltToken(token, remoteID)
-       if err != nil {
-               return nil, err
-       }
-       kccopy.Arvados.ApiToken = token
-       return kccopy, nil
-}
-
-var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`)
-
-// remoteResponseCacher wraps http.ResponseWriter. It buffers the
-// response data in the provided buffer, writes/touches a copy on a
-// local volume, adds a response header with a locally-signed locator,
-// and finally writes the data through.
-type remoteResponseCacher struct {
-       Locator       string
-       Token         string
-       Buffer        []byte
-       Context       context.Context
-       Cluster       *arvados.Cluster
-       VolumeManager *RRVolumeManager
-       http.ResponseWriter
-       statusCode int
-}
-
-func (rrc *remoteResponseCacher) Write(p []byte) (int, error) {
-       if len(rrc.Buffer)+len(p) > cap(rrc.Buffer) {
-               return 0, errors.New("buffer full")
-       }
-       rrc.Buffer = append(rrc.Buffer, p...)
-       return len(p), nil
-}
-
-func (rrc *remoteResponseCacher) WriteHeader(statusCode int) {
-       rrc.statusCode = statusCode
-}
-
-func (rrc *remoteResponseCacher) Close() error {
-       if rrc.statusCode == 0 {
-               rrc.statusCode = http.StatusOK
-       } else if rrc.statusCode != http.StatusOK {
-               rrc.ResponseWriter.WriteHeader(rrc.statusCode)
-               rrc.ResponseWriter.Write(rrc.Buffer)
-               return nil
-       }
-       _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32], nil)
-       if rrc.Context.Err() != nil {
-               // If caller hung up, log that instead of subsequent/misleading errors.
-               http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout)
-               return err
-       }
-       if err == RequestHashError {
-               http.Error(rrc.ResponseWriter, "checksum mismatch in remote response", http.StatusBadGateway)
-               return err
-       }
-       if err, ok := err.(*KeepError); ok {
-               http.Error(rrc.ResponseWriter, err.Error(), err.HTTPCode)
-               return err
-       }
-       if err != nil {
-               http.Error(rrc.ResponseWriter, err.Error(), http.StatusBadGateway)
-               return err
-       }
-
-       unsigned := localOrRemoteSignature.ReplaceAllLiteralString(rrc.Locator, "")
-       expiry := time.Now().Add(rrc.Cluster.Collections.BlobSigningTTL.Duration())
-       signed := SignLocator(rrc.Cluster, unsigned, rrc.Token, expiry)
-       if signed == unsigned {
-               err = errors.New("could not sign locator")
-               http.Error(rrc.ResponseWriter, err.Error(), http.StatusInternalServerError)
-               return err
-       }
-       rrc.Header().Set("X-Keep-Locator", signed)
-       rrc.ResponseWriter.WriteHeader(rrc.statusCode)
-       _, err = rrc.ResponseWriter.Write(rrc.Buffer)
-       return err
-}