14287: Reword filter restrictions.
[arvados.git] / lib / controller / federation / conn.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package federation
6
7 import (
8         "context"
9         "crypto/md5"
10         "errors"
11         "fmt"
12         "net/http"
13         "net/url"
14         "regexp"
15         "strings"
16
17         "git.curoverse.com/arvados.git/lib/controller/railsproxy"
18         "git.curoverse.com/arvados.git/lib/controller/rpc"
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/auth"
21         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
22 )
23
24 type Conn struct {
25         cluster *arvados.Cluster
26         local   backend
27         remotes map[string]backend
28 }
29
30 func New(cluster *arvados.Cluster) *Conn {
31         local := railsproxy.NewConn(cluster)
32         remotes := map[string]backend{}
33         for id, remote := range cluster.RemoteClusters {
34                 if !remote.Proxy {
35                         continue
36                 }
37                 remotes[id] = rpc.NewConn(id, &url.URL{Scheme: remote.Scheme, Host: remote.Host}, remote.Insecure, saltedTokenProvider(local, id))
38         }
39
40         return &Conn{
41                 cluster: cluster,
42                 local:   local,
43                 remotes: remotes,
44         }
45 }
46
47 // Return a new rpc.TokenProvider that takes the client-provided
48 // tokens from an incoming request context, determines whether they
49 // should (and can) be salted for the given remoteID, and returns the
50 // resulting tokens.
51 func saltedTokenProvider(local backend, remoteID string) rpc.TokenProvider {
52         return func(ctx context.Context) ([]string, error) {
53                 var tokens []string
54                 incoming, ok := auth.FromContext(ctx)
55                 if !ok {
56                         return nil, errors.New("no token provided")
57                 }
58                 for _, token := range incoming.Tokens {
59                         salted, err := auth.SaltToken(token, remoteID)
60                         switch err {
61                         case nil:
62                                 tokens = append(tokens, salted)
63                         case auth.ErrSalted:
64                                 tokens = append(tokens, token)
65                         case auth.ErrObsoleteToken:
66                                 ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{token}})
67                                 aca, err := local.APIClientAuthorizationCurrent(ctx, arvados.GetOptions{})
68                                 if errStatus(err) == http.StatusUnauthorized {
69                                         // pass through unmodified
70                                         tokens = append(tokens, token)
71                                         continue
72                                 } else if err != nil {
73                                         return nil, err
74                                 }
75                                 salted, err := auth.SaltToken(aca.TokenV2(), remoteID)
76                                 if err != nil {
77                                         return nil, err
78                                 }
79                                 tokens = append(tokens, salted)
80                         default:
81                                 return nil, err
82                         }
83                 }
84                 return tokens, nil
85         }
86 }
87
88 // Return suitable backend for a query about the given cluster ID
89 // ("aaaaa") or object UUID ("aaaaa-dz642-abcdefghijklmno").
90 func (conn *Conn) chooseBackend(id string) backend {
91         if len(id) == 27 {
92                 id = id[:5]
93         } else if len(id) != 5 {
94                 // PDH or bogus ID
95                 return conn.local
96         }
97         if id == conn.cluster.ClusterID {
98                 return conn.local
99         } else if be, ok := conn.remotes[id]; ok {
100                 return be
101         } else {
102                 // TODO: return an "always error" backend?
103                 return conn.local
104         }
105 }
106
107 // Call fn with the local backend; then, if fn returned 404, call fn
108 // on the available remote backends (possibly concurrently) until one
109 // succeeds.
110 //
111 // The second argument to fn is the cluster ID of the remote backend,
112 // or "" for the local backend.
113 //
114 // A non-nil error means all backends failed.
115 func (conn *Conn) tryLocalThenRemotes(ctx context.Context, fn func(context.Context, string, backend) error) error {
116         if err := fn(ctx, "", conn.local); err == nil || errStatus(err) != http.StatusNotFound {
117                 return err
118         }
119
120         ctx, cancel := context.WithCancel(ctx)
121         defer cancel()
122         errchan := make(chan error, len(conn.remotes))
123         for remoteID, be := range conn.remotes {
124                 remoteID, be := remoteID, be
125                 go func() {
126                         errchan <- fn(ctx, remoteID, be)
127                 }()
128         }
129         all404 := true
130         var errs []error
131         for i := 0; i < cap(errchan); i++ {
132                 err := <-errchan
133                 if err == nil {
134                         return nil
135                 }
136                 all404 = all404 && errStatus(err) == http.StatusNotFound
137                 errs = append(errs, err)
138         }
139         if all404 {
140                 return notFoundError{}
141         }
142         // FIXME: choose appropriate HTTP status
143         return fmt.Errorf("errors: %v", errs)
144 }
145
146 func (conn *Conn) CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error) {
147         return conn.chooseBackend(options.ClusterID).CollectionCreate(ctx, options)
148 }
149
150 func (conn *Conn) CollectionUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Collection, error) {
151         return conn.chooseBackend(options.UUID).CollectionUpdate(ctx, options)
152 }
153
154 func rewriteManifest(mt, remoteID string) string {
155         return regexp.MustCompile(` [0-9a-f]{32}\+[^ ]*`).ReplaceAllStringFunc(mt, func(tok string) string {
156                 return strings.Replace(tok, "+A", "+R"+remoteID+"-", -1)
157         })
158 }
159
160 // this could be in sdk/go/arvados
161 func portableDataHash(mt string) string {
162         h := md5.New()
163         blkRe := regexp.MustCompile(`^ [0-9a-f]{32}\+\d+`)
164         size := 0
165         _ = regexp.MustCompile(` ?[^ ]*`).ReplaceAllFunc([]byte(mt), func(tok []byte) []byte {
166                 if m := blkRe.Find(tok); m != nil {
167                         // write hash+size, ignore remaining block hints
168                         tok = m
169                 }
170                 n, err := h.Write(tok)
171                 if err != nil {
172                         panic(err)
173                 }
174                 size += n
175                 return nil
176         })
177         return fmt.Sprintf("%x+%d", h.Sum(nil), size)
178 }
179
180 func (conn *Conn) CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error) {
181         if len(options.UUID) == 27 {
182                 // UUID is really a UUID
183                 c, err := conn.chooseBackend(options.UUID).CollectionGet(ctx, options)
184                 if err == nil && options.UUID[:5] != conn.cluster.ClusterID {
185                         c.ManifestText = rewriteManifest(c.ManifestText, options.UUID[:5])
186                 }
187                 return c, err
188         } else {
189                 // UUID is a PDH
190                 first := make(chan arvados.Collection, 1)
191                 err := conn.tryLocalThenRemotes(ctx, func(ctx context.Context, remoteID string, be backend) error {
192                         c, err := be.CollectionGet(ctx, options)
193                         if err != nil {
194                                 return err
195                         }
196                         // options.UUID is either hash+size or
197                         // hash+size+hints; only hash+size need to
198                         // match the computed PDH.
199                         if pdh := portableDataHash(c.ManifestText); pdh != options.UUID && !strings.HasPrefix(options.UUID, pdh+"+") {
200                                 ctxlog.FromContext(ctx).Warnf("bad portable data hash %q received from remote %q (expected %q)", pdh, remoteID, options.UUID)
201                                 return notFoundError{}
202                         }
203                         if remoteID != "" {
204                                 c.ManifestText = rewriteManifest(c.ManifestText, remoteID)
205                         }
206                         select {
207                         case first <- c:
208                                 return nil
209                         default:
210                                 // lost race, return value doesn't matter
211                                 return nil
212                         }
213                 })
214                 if err != nil {
215                         return arvados.Collection{}, err
216                 }
217                 return <-first, nil
218         }
219 }
220
221 func (conn *Conn) CollectionProvenance(ctx context.Context, options arvados.GetOptions) (map[string]interface{}, error) {
222         return conn.chooseBackend(options.UUID).CollectionProvenance(ctx, options)
223 }
224
225 func (conn *Conn) CollectionUsedBy(ctx context.Context, options arvados.GetOptions) (map[string]interface{}, error) {
226         return conn.chooseBackend(options.UUID).CollectionUsedBy(ctx, options)
227 }
228
229 func (conn *Conn) CollectionDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error) {
230         return conn.chooseBackend(options.UUID).CollectionDelete(ctx, options)
231 }
232
233 func (conn *Conn) CollectionTrash(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error) {
234         return conn.chooseBackend(options.UUID).CollectionTrash(ctx, options)
235 }
236
237 func (conn *Conn) CollectionUntrash(ctx context.Context, options arvados.UntrashOptions) (arvados.Collection, error) {
238         return conn.chooseBackend(options.UUID).CollectionUntrash(ctx, options)
239 }
240
241 func (conn *Conn) ContainerCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Container, error) {
242         return conn.chooseBackend(options.ClusterID).ContainerCreate(ctx, options)
243 }
244
245 func (conn *Conn) ContainerUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Container, error) {
246         return conn.chooseBackend(options.UUID).ContainerUpdate(ctx, options)
247 }
248
249 func (conn *Conn) ContainerGet(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
250         return conn.chooseBackend(options.UUID).ContainerGet(ctx, options)
251 }
252
253 func (conn *Conn) ContainerDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Container, error) {
254         return conn.chooseBackend(options.UUID).ContainerDelete(ctx, options)
255 }
256
257 func (conn *Conn) ContainerLock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
258         return conn.chooseBackend(options.UUID).ContainerLock(ctx, options)
259 }
260
261 func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
262         return conn.chooseBackend(options.UUID).ContainerUnlock(ctx, options)
263 }
264
265 func (conn *Conn) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
266         return conn.chooseBackend(options.ClusterID).SpecimenCreate(ctx, options)
267 }
268
269 func (conn *Conn) SpecimenUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Specimen, error) {
270         return conn.chooseBackend(options.UUID).SpecimenUpdate(ctx, options)
271 }
272
273 func (conn *Conn) SpecimenGet(ctx context.Context, options arvados.GetOptions) (arvados.Specimen, error) {
274         return conn.chooseBackend(options.UUID).SpecimenGet(ctx, options)
275 }
276
277 func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Specimen, error) {
278         return conn.chooseBackend(options.UUID).SpecimenDelete(ctx, options)
279 }
280
281 func (conn *Conn) APIClientAuthorizationCurrent(ctx context.Context, options arvados.GetOptions) (arvados.APIClientAuthorization, error) {
282         return conn.chooseBackend(options.UUID).APIClientAuthorizationCurrent(ctx, options)
283 }
284
285 type backend interface{ arvados.API }
286
287 type notFoundError struct{}
288
289 func (notFoundError) HTTPStatus() int { return http.StatusNotFound }
290 func (notFoundError) Error() string   { return "not found" }
291
292 func errStatus(err error) int {
293         if httpErr, ok := err.(interface{ HTTPStatus() int }); ok {
294                 return httpErr.HTTPStatus()
295         } else {
296                 return http.StatusInternalServerError
297         }
298 }