14287: Handle collection/.../provenance and .../used_by requests.
[arvados.git] / lib / controller / federation / conn.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package federation
6
7 import (
8         "context"
9         "crypto/md5"
10         "errors"
11         "fmt"
12         "net/http"
13         "net/url"
14         "regexp"
15         "strings"
16
17         "git.curoverse.com/arvados.git/lib/controller/railsproxy"
18         "git.curoverse.com/arvados.git/lib/controller/rpc"
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/auth"
21         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
22 )
23
24 type Interface interface {
25         CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error)
26         CollectionUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Collection, error)
27         CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error)
28         CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error)
29         CollectionProvenance(ctx context.Context, options arvados.GetOptions) (map[string]interface{}, error)
30         CollectionUsedBy(ctx context.Context, options arvados.GetOptions) (map[string]interface{}, error)
31         CollectionDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error)
32         ContainerCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Container, error)
33         ContainerUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Container, error)
34         ContainerGet(ctx context.Context, options arvados.GetOptions) (arvados.Container, error)
35         ContainerList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerList, error)
36         ContainerDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Container, error)
37         ContainerLock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error)
38         ContainerUnlock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error)
39         SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error)
40         SpecimenUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Specimen, error)
41         SpecimenGet(ctx context.Context, options arvados.GetOptions) (arvados.Specimen, error)
42         SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error)
43         SpecimenDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Specimen, error)
44         APIClientAuthorizationCurrent(ctx context.Context, options arvados.GetOptions) (arvados.APIClientAuthorization, error)
45 }
46
47 type Conn struct {
48         cluster *arvados.Cluster
49         local   backend
50         remotes map[string]backend
51 }
52
53 func New(cluster *arvados.Cluster) Interface {
54         local := railsproxy.NewConn(cluster)
55         remotes := map[string]backend{}
56         for id, remote := range cluster.RemoteClusters {
57                 if !remote.Proxy {
58                         continue
59                 }
60                 remotes[id] = rpc.NewConn(id, &url.URL{Scheme: remote.Scheme, Host: remote.Host}, remote.Insecure, saltedTokenProvider(local, id))
61         }
62
63         return &Conn{
64                 cluster: cluster,
65                 local:   local,
66                 remotes: remotes,
67         }
68 }
69
70 // Return a new rpc.TokenProvider that takes the client-provided
71 // tokens from an incoming request context, determines whether they
72 // should (and can) be salted for the given remoteID, and returns the
73 // resulting tokens.
74 func saltedTokenProvider(local backend, remoteID string) rpc.TokenProvider {
75         return func(ctx context.Context) ([]string, error) {
76                 var tokens []string
77                 incoming, ok := ctx.Value(auth.ContextKeyCredentials).(*auth.Credentials)
78                 if !ok {
79                         return nil, errors.New("no token provided")
80                 }
81                 for _, token := range incoming.Tokens {
82                         salted, err := auth.SaltToken(token, remoteID)
83                         switch err {
84                         case nil:
85                                 tokens = append(tokens, salted)
86                         case auth.ErrSalted:
87                                 tokens = append(tokens, token)
88                         case auth.ErrObsoleteToken:
89                                 ctx := context.WithValue(ctx, auth.ContextKeyCredentials, &auth.Credentials{Tokens: []string{token}})
90                                 aca, err := local.APIClientAuthorizationCurrent(ctx, arvados.GetOptions{})
91                                 if errStatus(err) == http.StatusUnauthorized {
92                                         // pass through unmodified
93                                         tokens = append(tokens, token)
94                                         continue
95                                 } else if err != nil {
96                                         return nil, err
97                                 }
98                                 salted, err := auth.SaltToken(aca.TokenV2(), remoteID)
99                                 if err != nil {
100                                         return nil, err
101                                 }
102                                 tokens = append(tokens, salted)
103                         default:
104                                 return nil, err
105                         }
106                 }
107                 return tokens, nil
108         }
109 }
110
111 // Return suitable backend for a query about the given cluster ID
112 // ("aaaaa") or object UUID ("aaaaa-dz642-abcdefghijklmno").
113 func (conn *Conn) chooseBackend(id string) backend {
114         if len(id) > 5 {
115                 id = id[:5]
116         }
117         if id == conn.cluster.ClusterID {
118                 return conn.local
119         } else if be, ok := conn.remotes[id]; ok {
120                 return be
121         } else {
122                 // TODO: return an "always error" backend?
123                 return conn.local
124         }
125 }
126
127 // Call fn with the local backend; then, if fn returned 404, call fn
128 // on the available remote backends (possibly concurrently) until one
129 // succeeds.
130 //
131 // The second argument to fn is the cluster ID of the remote backend,
132 // or "" for the local backend.
133 //
134 // A non-nil error means all backends failed.
135 func (conn *Conn) tryLocalThenRemotes(ctx context.Context, fn func(context.Context, string, backend) error) error {
136         if err := fn(ctx, "", conn.local); err == nil || errStatus(err) != http.StatusNotFound {
137                 return err
138         }
139
140         ctx, cancel := context.WithCancel(ctx)
141         defer cancel()
142         errchan := make(chan error, len(conn.remotes))
143         for remoteID, be := range conn.remotes {
144                 remoteID, be := remoteID, be
145                 go func() {
146                         errchan <- fn(ctx, remoteID, be)
147                 }()
148         }
149         all404 := true
150         var errs []error
151         for i := 0; i < cap(errchan); i++ {
152                 err := <-errchan
153                 if err == nil {
154                         return nil
155                 }
156                 all404 = all404 && errStatus(err) == http.StatusNotFound
157                 errs = append(errs, err)
158         }
159         if all404 {
160                 return notFoundError{}
161         }
162         // FIXME: choose appropriate HTTP status
163         return fmt.Errorf("errors: %v", errs)
164 }
165
166 func (conn *Conn) CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error) {
167         return conn.chooseBackend(options.ClusterID).CollectionCreate(ctx, options)
168 }
169
170 func (conn *Conn) CollectionUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Collection, error) {
171         return conn.chooseBackend(options.UUID).CollectionUpdate(ctx, options)
172 }
173
174 func rewriteManifest(mt, remoteID string) string {
175         return regexp.MustCompile(` [0-9a-f]{32}\+[^ ]*`).ReplaceAllStringFunc(mt, func(tok string) string {
176                 return strings.Replace(tok, "+A", "+R"+remoteID+"-", -1)
177         })
178 }
179
180 // this could be in sdk/go/arvados
181 func portableDataHash(mt string) string {
182         h := md5.New()
183         blkRe := regexp.MustCompile(`^ [0-9a-f]{32}\+\d+`)
184         size := 0
185         _ = regexp.MustCompile(` ?[^ ]*`).ReplaceAllFunc([]byte(mt), func(tok []byte) []byte {
186                 if m := blkRe.Find(tok); m != nil {
187                         // write hash+size, ignore remaining block hints
188                         tok = m
189                 }
190                 n, err := h.Write(tok)
191                 if err != nil {
192                         panic(err)
193                 }
194                 size += n
195                 return nil
196         })
197         return fmt.Sprintf("%x+%d", h.Sum(nil), size)
198 }
199
200 func (conn *Conn) CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error) {
201         if len(options.UUID) == 27 {
202                 // UUID is really a UUID
203                 c, err := conn.chooseBackend(options.UUID).CollectionGet(ctx, options)
204                 if err == nil && options.UUID[:5] != conn.cluster.ClusterID {
205                         c.ManifestText = rewriteManifest(c.ManifestText, options.UUID[:5])
206                 }
207                 return c, err
208         } else {
209                 // UUID is a PDH
210                 first := make(chan arvados.Collection, 1)
211                 err := conn.tryLocalThenRemotes(ctx, func(ctx context.Context, remoteID string, be backend) error {
212                         c, err := be.CollectionGet(ctx, options)
213                         if err != nil {
214                                 return err
215                         }
216                         // options.UUID is either hash+size or
217                         // hash+size+hints; only hash+size need to
218                         // match the computed PDH.
219                         if pdh := portableDataHash(c.ManifestText); pdh != options.UUID && !strings.HasPrefix(options.UUID, pdh+"+") {
220                                 ctxlog.FromContext(ctx).Warnf("bad portable data hash %q received from remote %q (expected %q)", pdh, remoteID, options.UUID)
221                                 return notFoundError{}
222                         }
223                         if remoteID != "" {
224                                 c.ManifestText = rewriteManifest(c.ManifestText, remoteID)
225                         }
226                         select {
227                         case first <- c:
228                                 return nil
229                         default:
230                                 // lost race, return value doesn't matter
231                                 return nil
232                         }
233                 })
234                 if err != nil {
235                         return arvados.Collection{}, err
236                 }
237                 return <-first, nil
238         }
239 }
240
241 func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
242         return conn.local.CollectionList(ctx, options)
243 }
244
245 func (conn *Conn) CollectionProvenance(ctx context.Context, options arvados.GetOptions) (map[string]interface{}, error) {
246         return conn.local.CollectionProvenance(ctx, options)
247 }
248
249 func (conn *Conn) CollectionUsedBy(ctx context.Context, options arvados.GetOptions) (map[string]interface{}, error) {
250         return conn.local.CollectionUsedBy(ctx, options)
251 }
252
253 func (conn *Conn) CollectionDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error) {
254         return conn.chooseBackend(options.UUID).CollectionDelete(ctx, options)
255 }
256
257 func (conn *Conn) ContainerCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Container, error) {
258         return conn.chooseBackend(options.ClusterID).ContainerCreate(ctx, options)
259 }
260
261 func (conn *Conn) ContainerUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Container, error) {
262         return conn.chooseBackend(options.UUID).ContainerUpdate(ctx, options)
263 }
264
265 func (conn *Conn) ContainerGet(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
266         return conn.chooseBackend(options.UUID).ContainerGet(ctx, options)
267 }
268
269 func (conn *Conn) ContainerList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerList, error) {
270         return conn.local.ContainerList(ctx, options)
271 }
272
273 func (conn *Conn) ContainerDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Container, error) {
274         return conn.chooseBackend(options.UUID).ContainerDelete(ctx, options)
275 }
276
277 func (conn *Conn) ContainerLock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
278         return conn.chooseBackend(options.UUID).ContainerLock(ctx, options)
279 }
280
281 func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
282         return conn.chooseBackend(options.UUID).ContainerUnlock(ctx, options)
283 }
284
285 func (conn *Conn) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
286         return conn.chooseBackend(options.ClusterID).SpecimenCreate(ctx, options)
287 }
288
289 func (conn *Conn) SpecimenUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Specimen, error) {
290         return conn.chooseBackend(options.UUID).SpecimenUpdate(ctx, options)
291 }
292
293 func (conn *Conn) SpecimenGet(ctx context.Context, options arvados.GetOptions) (arvados.Specimen, error) {
294         return conn.chooseBackend(options.UUID).SpecimenGet(ctx, options)
295 }
296
297 func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
298         return conn.local.SpecimenList(ctx, options)
299 }
300
301 func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Specimen, error) {
302         return conn.chooseBackend(options.UUID).SpecimenDelete(ctx, options)
303 }
304
305 func (conn *Conn) APIClientAuthorizationCurrent(ctx context.Context, options arvados.GetOptions) (arvados.APIClientAuthorization, error) {
306         return conn.chooseBackend(options.UUID).APIClientAuthorizationCurrent(ctx, options)
307 }
308
309 type backend interface{ Interface }
310
311 type notFoundError struct{}
312
313 func (notFoundError) HTTPStatus() int { return http.StatusNotFound }
314 func (notFoundError) Error() string   { return "not found" }
315
316 func errStatus(err error) int {
317         if httpErr, ok := err.(interface{ HTTPStatus() int }); ok {
318                 return httpErr.HTTPStatus()
319         } else {
320                 return http.StatusInternalServerError
321         }
322 }