14199: Require "X-Keep-Signature: local[, ...]" to invoke signing.
[arvados.git] / services / keepstore / proxy_remote.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "errors"
10         "io"
11         "net/http"
12         "regexp"
13         "strings"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
19         "git.curoverse.com/arvados.git/sdk/go/auth"
20         "git.curoverse.com/arvados.git/sdk/go/keepclient"
21 )
22
23 type remoteProxy struct {
24         clients map[string]*keepclient.KeepClient
25         mtx     sync.Mutex
26 }
27
28 func (rp *remoteProxy) Get(ctx context.Context, w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster) {
29         // Intervening proxies must not return a cached GET response
30         // to a prior request if a X-Keep-Signature request header has
31         // been added or changed.
32         w.Header().Add("Vary", "X-Keep-Signature")
33
34         token := GetAPIToken(r)
35         if token == "" {
36                 http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
37                 return
38         }
39         if strings.SplitN(r.Header.Get("X-Keep-Signature"), ",", 2)[0] == "local" {
40                 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
41                 if err != nil {
42                         http.Error(w, err.Error(), http.StatusServiceUnavailable)
43                         return
44                 }
45                 defer bufs.Put(buf)
46                 rrc := &remoteResponseCacher{
47                         Locator:        r.URL.Path[1:],
48                         Token:          token,
49                         Buffer:         buf[:0],
50                         ResponseWriter: w,
51                 }
52                 defer rrc.Flush(ctx)
53                 w = rrc
54         }
55         var remoteClient *keepclient.KeepClient
56         var parts []string
57         for i, part := range strings.Split(r.URL.Path[1:], "+") {
58                 switch {
59                 case i == 0:
60                         // don't try to parse hash part as hint
61                 case strings.HasPrefix(part, "A"):
62                         // drop local permission hint
63                         continue
64                 case len(part) > 7 && part[0] == 'R' && part[6] == '-':
65                         remoteID := part[1:6]
66                         remote, ok := cluster.RemoteClusters[remoteID]
67                         if !ok {
68                                 http.Error(w, "remote cluster not configured", http.StatusBadGateway)
69                                 return
70                         }
71                         kc, err := rp.remoteClient(remoteID, remote, token)
72                         if err == auth.ErrObsoleteToken {
73                                 http.Error(w, err.Error(), http.StatusBadRequest)
74                                 return
75                         } else if err != nil {
76                                 http.Error(w, err.Error(), http.StatusInternalServerError)
77                                 return
78                         }
79                         remoteClient = kc
80                         part = "A" + part[7:]
81                 }
82                 parts = append(parts, part)
83         }
84         if remoteClient == nil {
85                 http.Error(w, "bad request", http.StatusBadRequest)
86                 return
87         }
88         locator := strings.Join(parts, "+")
89         rdr, _, _, err := remoteClient.Get(locator)
90         switch err.(type) {
91         case nil:
92                 defer rdr.Close()
93                 io.Copy(w, rdr)
94         case *keepclient.ErrNotFound:
95                 http.Error(w, err.Error(), http.StatusNotFound)
96         default:
97                 http.Error(w, err.Error(), http.StatusBadGateway)
98         }
99 }
100
101 func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
102         rp.mtx.Lock()
103         kc, ok := rp.clients[remoteID]
104         rp.mtx.Unlock()
105         if !ok {
106                 c := &arvados.Client{
107                         APIHost:   remoteCluster.Host,
108                         AuthToken: "xxx",
109                         Insecure:  remoteCluster.Insecure,
110                 }
111                 ac, err := arvadosclient.New(c)
112                 if err != nil {
113                         return nil, err
114                 }
115                 kc, err = keepclient.MakeKeepClient(ac)
116                 if err != nil {
117                         return nil, err
118                 }
119
120                 rp.mtx.Lock()
121                 if rp.clients == nil {
122                         rp.clients = map[string]*keepclient.KeepClient{remoteID: kc}
123                 } else {
124                         rp.clients[remoteID] = kc
125                 }
126                 rp.mtx.Unlock()
127         }
128         accopy := *kc.Arvados
129         accopy.ApiToken = token
130         kccopy := *kc
131         kccopy.Arvados = &accopy
132         token, err := auth.SaltToken(token, remoteID)
133         if err != nil {
134                 return nil, err
135         }
136         kccopy.Arvados.ApiToken = token
137         return &kccopy, nil
138 }
139
140 var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^\+]*`)
141
142 // remoteResponseCacher wraps http.ResponseWriter. It buffers the
143 // response data in the provided buffer, writes/touches a copy on a
144 // local volume, adds a response header with a locally-signed locator,
145 // and finally writes the data through.
146 type remoteResponseCacher struct {
147         Locator string
148         Token   string
149         Buffer  []byte
150         http.ResponseWriter
151         statusCode int
152 }
153
154 func (rrc *remoteResponseCacher) Write(p []byte) (int, error) {
155         if len(rrc.Buffer)+len(p) > cap(rrc.Buffer) {
156                 return 0, errors.New("buffer full")
157         }
158         rrc.Buffer = append(rrc.Buffer, p...)
159         return len(p), nil
160 }
161
162 func (rrc *remoteResponseCacher) WriteHeader(statusCode int) {
163         rrc.statusCode = statusCode
164 }
165
166 func (rrc *remoteResponseCacher) Flush(ctx context.Context) {
167         if rrc.statusCode == 0 {
168                 rrc.statusCode = http.StatusOK
169         } else if rrc.statusCode != http.StatusOK {
170                 rrc.ResponseWriter.WriteHeader(rrc.statusCode)
171                 rrc.ResponseWriter.Write(rrc.Buffer)
172                 return
173         }
174         _, err := PutBlock(ctx, rrc.Buffer, rrc.Locator[:32])
175         if err == RequestHashError {
176                 http.Error(rrc.ResponseWriter, "checksum mismatch in remote response", http.StatusBadGateway)
177                 return
178         }
179         if err, ok := err.(*KeepError); ok {
180                 http.Error(rrc.ResponseWriter, err.Error(), err.HTTPCode)
181                 return
182         }
183         if err != nil {
184                 http.Error(rrc.ResponseWriter, err.Error(), http.StatusBadGateway)
185                 return
186         }
187
188         unsigned := localOrRemoteSignature.ReplaceAllLiteralString(rrc.Locator, "")
189         signed := SignLocator(unsigned, rrc.Token, time.Now().Add(theConfig.BlobSignatureTTL.Duration()))
190         if signed == unsigned {
191                 http.Error(rrc.ResponseWriter, "could not sign locator", http.StatusInternalServerError)
192                 return
193         }
194         rrc.Header().Set("X-Keep-Locator", signed)
195         rrc.ResponseWriter.WriteHeader(rrc.statusCode)
196         rrc.ResponseWriter.Write(rrc.Buffer)
197 }