14199: Store data if X-Keep-Signature given in proxied GET/HEAD req.
[arvados.git] / services / keepstore / proxy_remote.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "errors"
10         "io"
11         "net/http"
12         "regexp"
13         "strings"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
19         "git.curoverse.com/arvados.git/sdk/go/auth"
20         "git.curoverse.com/arvados.git/sdk/go/keepclient"
21 )
22
23 type remoteProxy struct {
24         clients map[string]*keepclient.KeepClient
25         mtx     sync.Mutex
26 }
27
28 func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster) {
29         token := GetAPIToken(r)
30         if token == "" {
31                 http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
32                 return
33         }
34         if sign := r.Header.Get("X-Keep-Signature"); sign != "" {
35                 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
36                 if err != nil {
37                         http.Error(w, err.Error(), http.StatusServiceUnavailable)
38                         return
39                 }
40                 defer bufs.Put(buf)
41                 rrc := &remoteResponseCacher{
42                         Locator:        r.URL.Path[1:],
43                         Token:          token,
44                         Buffer:         buf[:0],
45                         ResponseWriter: w,
46                 }
47                 defer rrc.Flush(ctx)
48                 w = rrc
49         }
50         var remoteClient *keepclient.KeepClient
51         var parts []string
52         for i, part := range strings.Split(r.URL.Path[1:], "+") {
53                 switch {
54                 case i == 0:
55                         // don't try to parse hash part as hint
56                 case strings.HasPrefix(part, "A"):
57                         // drop local permission hint
58                         continue
59                 case len(part) > 7 && part[0] == 'R' && part[6] == '-':
60                         remoteID := part[1:6]
61                         remote, ok := cluster.RemoteClusters[remoteID]
62                         if !ok {
63                                 http.Error(w, "remote cluster not configured", http.StatusBadGateway)
64                                 return
65                         }
66                         kc, err := rp.remoteClient(remoteID, remote, token)
67                         if err == auth.ErrObsoleteToken {
68                                 http.Error(w, err.Error(), http.StatusBadRequest)
69                                 return
70                         } else if err != nil {
71                                 http.Error(w, err.Error(), http.StatusInternalServerError)
72                                 return
73                         }
74                         remoteClient = kc
75                         part = "A" + part[7:]
76                 }
77                 parts = append(parts, part)
78         }
79         if remoteClient == nil {
80                 http.Error(w, "bad request", http.StatusBadRequest)
81                 return
82         }
83         locator := strings.Join(parts, "+")
84         rdr, _, _, err := remoteClient.Get(locator)
85         switch err.(type) {
86         case nil:
87                 defer rdr.Close()
88                 io.Copy(w, rdr)
89         case *keepclient.ErrNotFound:
90                 http.Error(w, err.Error(), http.StatusNotFound)
91         default:
92                 http.Error(w, err.Error(), http.StatusBadGateway)
93         }
94 }
95
96 func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
97         rp.mtx.Lock()
98         kc, ok := rp.clients[remoteID]
99         rp.mtx.Unlock()
100         if !ok {
101                 c := &arvados.Client{
102                         APIHost:   remoteCluster.Host,
103                         AuthToken: "xxx",
104                         Insecure:  remoteCluster.Insecure,
105                 }
106                 ac, err := arvadosclient.New(c)
107                 if err != nil {
108                         return nil, err
109                 }
110                 kc, err = keepclient.MakeKeepClient(ac)
111                 if err != nil {
112                         return nil, err
113                 }
114
115                 rp.mtx.Lock()
116                 if rp.clients == nil {
117                         rp.clients = map[string]*keepclient.KeepClient{remoteID: kc}
118                 } else {
119                         rp.clients[remoteID] = kc
120                 }
121                 rp.mtx.Unlock()
122         }
123         accopy := *kc.Arvados
124         accopy.ApiToken = token
125         kccopy := *kc
126         kccopy.Arvados = &accopy
127         token, err := auth.SaltToken(token, remoteID)
128         if err != nil {
129                 return nil, err
130         }
131         kccopy.Arvados.ApiToken = token
132         return &kccopy, nil
133 }
134
135 var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`)
136
137 // remoteResponseCacher wraps http.ResponseWriter. It buffers the
138 // response data in the provided buffer, writes/touches a copy on a
139 // local volume, adds a response header with a locally-signed locator,
140 // and finally writes the data through.
141 type remoteResponseCacher struct {
142         Locator string
143         Token   string
144         Buffer  []byte
145         http.ResponseWriter
146         statusCode int
147 }
148
149 func (rrc *remoteResponseCacher) Write(p []byte) (int, error) {
150         if len(rrc.Buffer)+len(p) > cap(rrc.Buffer) {
151                 return 0, errors.New("buffer full")
152         }
153         rrc.Buffer = append(rrc.Buffer, p...)
154         return len(p), nil
155 }
156
157 func (rrc *remoteResponseCacher) WriteHeader(statusCode int) {
158         rrc.statusCode = statusCode
159 }
160
161 func (rrc *remoteResponseCacher) Flush(ctx context.Context) {
162         if rrc.statusCode == 0 {
163                 rrc.statusCode = http.StatusOK
164         } else if rrc.statusCode != http.StatusOK {
165                 rrc.ResponseWriter.WriteHeader(rrc.statusCode)
166                 rrc.ResponseWriter.Write(rrc.Buffer)
167                 return
168         }
169         _, err := PutBlock(ctx, rrc.Buffer, rrc.Locator[:32])
170         if err == RequestHashError {
171                 http.Error(rrc.ResponseWriter, "checksum mismatch in remote response", http.StatusBadGateway)
172                 return
173         }
174         if err, ok := err.(*KeepError); ok {
175                 http.Error(rrc.ResponseWriter, err.Error(), err.HTTPCode)
176                 return
177         }
178         if err != nil {
179                 http.Error(rrc.ResponseWriter, err.Error(), http.StatusBadGateway)
180                 return
181         }
182
183         unsigned := localOrRemoteSignature.ReplaceAllLiteralString(rrc.Locator, "")
184         signed := SignLocator(unsigned, rrc.Token, time.Now().Add(theConfig.BlobSignatureTTL.Duration()))
185         if signed == unsigned {
186                 http.Error(rrc.ResponseWriter, "could not sign locator", http.StatusInternalServerError)
187                 return
188         }
189         rrc.Header().Set("X-Keep-Locator", signed)
190         rrc.ResponseWriter.WriteHeader(rrc.statusCode)
191         rrc.ResponseWriter.Write(rrc.Buffer)
192 }