Merge branch 'main' into 21386-project-loading-view
[arvados.git] / services / keepstore / proxy_remote.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "context"
9         "errors"
10         "io"
11         "net/http"
12         "regexp"
13         "strings"
14         "sync"
15         "time"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19         "git.arvados.org/arvados.git/sdk/go/auth"
20         "git.arvados.org/arvados.git/sdk/go/keepclient"
21 )
22
23 type remoteProxy struct {
24         clients map[string]*keepclient.KeepClient
25         mtx     sync.Mutex
26 }
27
28 func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster, volmgr *RRVolumeManager) {
29         // Intervening proxies must not return a cached GET response
30         // to a prior request if a X-Keep-Signature request header has
31         // been added or changed.
32         w.Header().Add("Vary", "X-Keep-Signature")
33
34         token := GetAPIToken(r)
35         if token == "" {
36                 http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
37                 return
38         }
39         if strings.SplitN(r.Header.Get("X-Keep-Signature"), ",", 2)[0] == "local" {
40                 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
41                 if err != nil {
42                         http.Error(w, err.Error(), http.StatusServiceUnavailable)
43                         return
44                 }
45                 defer bufs.Put(buf)
46                 rrc := &remoteResponseCacher{
47                         Locator:        r.URL.Path[1:],
48                         Token:          token,
49                         Buffer:         buf[:0],
50                         ResponseWriter: w,
51                         Context:        ctx,
52                         Cluster:        cluster,
53                         VolumeManager:  volmgr,
54                 }
55                 defer rrc.Close()
56                 w = rrc
57         }
58         var remoteClient *keepclient.KeepClient
59         var parts []string
60         for i, part := range strings.Split(r.URL.Path[1:], "+") {
61                 switch {
62                 case i == 0:
63                         // don't try to parse hash part as hint
64                 case strings.HasPrefix(part, "A"):
65                         // drop local permission hint
66                         continue
67                 case len(part) > 7 && part[0] == 'R' && part[6] == '-':
68                         remoteID := part[1:6]
69                         remote, ok := cluster.RemoteClusters[remoteID]
70                         if !ok {
71                                 http.Error(w, "remote cluster not configured", http.StatusBadRequest)
72                                 return
73                         }
74                         kc, err := rp.remoteClient(remoteID, remote, token)
75                         if err == auth.ErrObsoleteToken {
76                                 http.Error(w, err.Error(), http.StatusBadRequest)
77                                 return
78                         } else if err != nil {
79                                 http.Error(w, err.Error(), http.StatusInternalServerError)
80                                 return
81                         }
82                         remoteClient = kc
83                         part = "A" + part[7:]
84                 }
85                 parts = append(parts, part)
86         }
87         if remoteClient == nil {
88                 http.Error(w, "bad request", http.StatusBadRequest)
89                 return
90         }
91         locator := strings.Join(parts, "+")
92         rdr, _, _, err := remoteClient.Get(locator)
93         switch err.(type) {
94         case nil:
95                 defer rdr.Close()
96                 io.Copy(w, rdr)
97         case *keepclient.ErrNotFound:
98                 http.Error(w, err.Error(), http.StatusNotFound)
99         default:
100                 http.Error(w, err.Error(), http.StatusBadGateway)
101         }
102 }
103
104 func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
105         rp.mtx.Lock()
106         kc, ok := rp.clients[remoteID]
107         rp.mtx.Unlock()
108         if !ok {
109                 c := &arvados.Client{
110                         APIHost:   remoteCluster.Host,
111                         AuthToken: "xxx",
112                         Insecure:  remoteCluster.Insecure,
113                 }
114                 ac, err := arvadosclient.New(c)
115                 if err != nil {
116                         return nil, err
117                 }
118                 kc, err = keepclient.MakeKeepClient(ac)
119                 if err != nil {
120                         return nil, err
121                 }
122                 kc.DiskCacheSize = keepclient.DiskCacheDisabled
123
124                 rp.mtx.Lock()
125                 if rp.clients == nil {
126                         rp.clients = map[string]*keepclient.KeepClient{remoteID: kc}
127                 } else {
128                         rp.clients[remoteID] = kc
129                 }
130                 rp.mtx.Unlock()
131         }
132         accopy := *kc.Arvados
133         accopy.ApiToken = token
134         kccopy := kc.Clone()
135         kccopy.Arvados = &accopy
136         token, err := auth.SaltToken(token, remoteID)
137         if err != nil {
138                 return nil, err
139         }
140         kccopy.Arvados.ApiToken = token
141         return kccopy, nil
142 }
143
144 var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`)
145
146 // remoteResponseCacher wraps http.ResponseWriter. It buffers the
147 // response data in the provided buffer, writes/touches a copy on a
148 // local volume, adds a response header with a locally-signed locator,
149 // and finally writes the data through.
150 type remoteResponseCacher struct {
151         Locator       string
152         Token         string
153         Buffer        []byte
154         Context       context.Context
155         Cluster       *arvados.Cluster
156         VolumeManager *RRVolumeManager
157         http.ResponseWriter
158         statusCode int
159 }
160
161 func (rrc *remoteResponseCacher) Write(p []byte) (int, error) {
162         if len(rrc.Buffer)+len(p) > cap(rrc.Buffer) {
163                 return 0, errors.New("buffer full")
164         }
165         rrc.Buffer = append(rrc.Buffer, p...)
166         return len(p), nil
167 }
168
169 func (rrc *remoteResponseCacher) WriteHeader(statusCode int) {
170         rrc.statusCode = statusCode
171 }
172
173 func (rrc *remoteResponseCacher) Close() error {
174         if rrc.statusCode == 0 {
175                 rrc.statusCode = http.StatusOK
176         } else if rrc.statusCode != http.StatusOK {
177                 rrc.ResponseWriter.WriteHeader(rrc.statusCode)
178                 rrc.ResponseWriter.Write(rrc.Buffer)
179                 return nil
180         }
181         _, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32], nil)
182         if rrc.Context.Err() != nil {
183                 // If caller hung up, log that instead of subsequent/misleading errors.
184                 http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout)
185                 return err
186         }
187         if err == RequestHashError {
188                 http.Error(rrc.ResponseWriter, "checksum mismatch in remote response", http.StatusBadGateway)
189                 return err
190         }
191         if err, ok := err.(*KeepError); ok {
192                 http.Error(rrc.ResponseWriter, err.Error(), err.HTTPCode)
193                 return err
194         }
195         if err != nil {
196                 http.Error(rrc.ResponseWriter, err.Error(), http.StatusBadGateway)
197                 return err
198         }
199
200         unsigned := localOrRemoteSignature.ReplaceAllLiteralString(rrc.Locator, "")
201         expiry := time.Now().Add(rrc.Cluster.Collections.BlobSigningTTL.Duration())
202         signed := SignLocator(rrc.Cluster, unsigned, rrc.Token, expiry)
203         if signed == unsigned {
204                 err = errors.New("could not sign locator")
205                 http.Error(rrc.ResponseWriter, err.Error(), http.StatusInternalServerError)
206                 return err
207         }
208         rrc.Header().Set("X-Keep-Locator", signed)
209         rrc.ResponseWriter.WriteHeader(rrc.statusCode)
210         _, err = rrc.ResponseWriter.Write(rrc.Buffer)
211         return err
212 }