1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19 "git.arvados.org/arvados.git/sdk/go/auth"
20 "git.arvados.org/arvados.git/sdk/go/keepclient"
23 type remoteProxy struct {
24 clients map[string]*keepclient.KeepClient
28 func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster, volmgr *RRVolumeManager) {
29 // Intervening proxies must not return a cached GET response
30 // to a prior request if a X-Keep-Signature request header has
31 // been added or changed.
32 w.Header().Add("Vary", "X-Keep-Signature")
34 token := GetAPIToken(r)
36 http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
39 if strings.SplitN(r.Header.Get("X-Keep-Signature"), ",", 2)[0] == "local" {
40 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
42 http.Error(w, err.Error(), http.StatusServiceUnavailable)
46 rrc := &remoteResponseCacher{
47 Locator: r.URL.Path[1:],
53 VolumeManager: volmgr,
58 var remoteClient *keepclient.KeepClient
60 for i, part := range strings.Split(r.URL.Path[1:], "+") {
63 // don't try to parse hash part as hint
64 case strings.HasPrefix(part, "A"):
65 // drop local permission hint
67 case len(part) > 7 && part[0] == 'R' && part[6] == '-':
69 remote, ok := cluster.RemoteClusters[remoteID]
71 http.Error(w, "remote cluster not configured", http.StatusBadRequest)
74 kc, err := rp.remoteClient(remoteID, remote, token)
75 if err == auth.ErrObsoleteToken {
76 http.Error(w, err.Error(), http.StatusBadRequest)
78 } else if err != nil {
79 http.Error(w, err.Error(), http.StatusInternalServerError)
85 parts = append(parts, part)
87 if remoteClient == nil {
88 http.Error(w, "bad request", http.StatusBadRequest)
91 locator := strings.Join(parts, "+")
92 rdr, _, _, err := remoteClient.Get(locator)
97 case *keepclient.ErrNotFound:
98 http.Error(w, err.Error(), http.StatusNotFound)
100 http.Error(w, err.Error(), http.StatusBadGateway)
104 func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
106 kc, ok := rp.clients[remoteID]
109 c := &arvados.Client{
110 APIHost: remoteCluster.Host,
112 Insecure: remoteCluster.Insecure,
114 ac, err := arvadosclient.New(c)
118 kc, err = keepclient.MakeKeepClient(ac)
122 kc.DiskCacheSize = keepclient.DiskCacheDisabled
125 if rp.clients == nil {
126 rp.clients = map[string]*keepclient.KeepClient{remoteID: kc}
128 rp.clients[remoteID] = kc
132 accopy := *kc.Arvados
133 accopy.ApiToken = token
135 kccopy.Arvados = &accopy
136 token, err := auth.SaltToken(token, remoteID)
140 kccopy.Arvados.ApiToken = token
144 var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`)
146 // remoteResponseCacher wraps http.ResponseWriter. It buffers the
147 // response data in the provided buffer, writes/touches a copy on a
148 // local volume, adds a response header with a locally-signed locator,
149 // and finally writes the data through.
150 type remoteResponseCacher struct {
154 Context context.Context
155 Cluster *arvados.Cluster
156 VolumeManager *RRVolumeManager
161 func (rrc *remoteResponseCacher) Write(p []byte) (int, error) {
162 if len(rrc.Buffer)+len(p) > cap(rrc.Buffer) {
163 return 0, errors.New("buffer full")
165 rrc.Buffer = append(rrc.Buffer, p...)
169 func (rrc *remoteResponseCacher) WriteHeader(statusCode int) {
170 rrc.statusCode = statusCode
173 func (rrc *remoteResponseCacher) Close() error {
174 if rrc.statusCode == 0 {
175 rrc.statusCode = http.StatusOK
176 } else if rrc.statusCode != http.StatusOK {
177 rrc.ResponseWriter.WriteHeader(rrc.statusCode)
178 rrc.ResponseWriter.Write(rrc.Buffer)
181 _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32], nil)
182 if rrc.Context.Err() != nil {
183 // If caller hung up, log that instead of subsequent/misleading errors.
184 http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout)
187 if err == RequestHashError {
188 http.Error(rrc.ResponseWriter, "checksum mismatch in remote response", http.StatusBadGateway)
191 if err, ok := err.(*KeepError); ok {
192 http.Error(rrc.ResponseWriter, err.Error(), err.HTTPCode)
196 http.Error(rrc.ResponseWriter, err.Error(), http.StatusBadGateway)
200 unsigned := localOrRemoteSignature.ReplaceAllLiteralString(rrc.Locator, "")
201 expiry := time.Now().Add(rrc.Cluster.Collections.BlobSigningTTL.Duration())
202 signed := SignLocator(rrc.Cluster, unsigned, rrc.Token, expiry)
203 if signed == unsigned {
204 err = errors.New("could not sign locator")
205 http.Error(rrc.ResponseWriter, err.Error(), http.StatusInternalServerError)
208 rrc.Header().Set("X-Keep-Locator", signed)
209 rrc.ResponseWriter.WriteHeader(rrc.statusCode)
210 _, err = rrc.ResponseWriter.Write(rrc.Buffer)