14287: Refactor controller to use strong types in API handlers.
[arvados.git] / lib / controller / federation / conn.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package federation
6
7 import (
8         "context"
9         "crypto/md5"
10         "errors"
11         "fmt"
12         "net/http"
13         "net/url"
14         "regexp"
15         "strings"
16
17         "git.curoverse.com/arvados.git/lib/controller/railsproxy"
18         "git.curoverse.com/arvados.git/lib/controller/rpc"
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/auth"
21         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
22 )
23
24 type Interface interface {
25         CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error)
26         CollectionUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Collection, error)
27         CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error)
28         CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error)
29         CollectionDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error)
30         ContainerCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Container, error)
31         ContainerUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Container, error)
32         ContainerGet(ctx context.Context, options arvados.GetOptions) (arvados.Container, error)
33         ContainerList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerList, error)
34         ContainerDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Container, error)
35         ContainerLock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error)
36         ContainerUnlock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error)
37         SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error)
38         SpecimenUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Specimen, error)
39         SpecimenGet(ctx context.Context, options arvados.GetOptions) (arvados.Specimen, error)
40         SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error)
41         SpecimenDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Specimen, error)
42         APIClientAuthorizationCurrent(ctx context.Context, options arvados.GetOptions) (arvados.APIClientAuthorization, error)
43 }
44
45 type Conn struct {
46         cluster *arvados.Cluster
47         local   backend
48         remotes map[string]backend
49 }
50
51 func New(cluster *arvados.Cluster) Interface {
52         local := railsproxy.NewConn(cluster)
53         remotes := map[string]backend{}
54         for id, remote := range cluster.RemoteClusters {
55                 if !remote.Proxy {
56                         continue
57                 }
58                 remotes[id] = rpc.NewConn(id, &url.URL{Scheme: remote.Scheme, Host: remote.Host}, remote.Insecure, saltedTokenProvider(local, id))
59         }
60
61         return &Conn{
62                 cluster: cluster,
63                 local:   local,
64                 remotes: remotes,
65         }
66 }
67
68 // Return a new rpc.TokenProvider that takes the client-provided
69 // tokens from an incoming request context, determines whether they
70 // should (and can) be salted for the given remoteID, and returns the
71 // resulting tokens.
72 func saltedTokenProvider(local backend, remoteID string) rpc.TokenProvider {
73         return func(ctx context.Context) ([]string, error) {
74                 var tokens []string
75                 incoming, ok := ctx.Value(auth.ContextKeyCredentials).(*auth.Credentials)
76                 if !ok {
77                         return nil, errors.New("no token provided")
78                 }
79                 for _, token := range incoming.Tokens {
80                         salted, err := auth.SaltToken(token, remoteID)
81                         switch err {
82                         case nil:
83                                 tokens = append(tokens, salted)
84                         case auth.ErrSalted:
85                                 tokens = append(tokens, token)
86                         case auth.ErrObsoleteToken:
87                                 ctx := context.WithValue(ctx, auth.ContextKeyCredentials, &auth.Credentials{Tokens: []string{token}})
88                                 aca, err := local.APIClientAuthorizationCurrent(ctx, arvados.GetOptions{})
89                                 if errStatus(err) == http.StatusUnauthorized {
90                                         // pass through unmodified
91                                         tokens = append(tokens, token)
92                                         continue
93                                 } else if err != nil {
94                                         return nil, err
95                                 }
96                                 salted, err := auth.SaltToken(aca.TokenV2(), remoteID)
97                                 if err != nil {
98                                         return nil, err
99                                 }
100                                 tokens = append(tokens, salted)
101                         default:
102                                 return nil, err
103                         }
104                 }
105                 return tokens, nil
106         }
107 }
108
109 // Return suitable backend for a query about the given cluster ID
110 // ("aaaaa") or object UUID ("aaaaa-dz642-abcdefghijklmno").
111 func (conn *Conn) chooseBackend(id string) backend {
112         if len(id) > 5 {
113                 id = id[:5]
114         }
115         if id == conn.cluster.ClusterID {
116                 return conn.local
117         } else if be, ok := conn.remotes[id]; ok {
118                 return be
119         } else {
120                 // TODO: return an "always error" backend?
121                 return conn.local
122         }
123 }
124
125 // Call fn with the local backend; then, if fn returned 404, call fn
126 // on the available remote backends (possibly concurrently) until one
127 // succeeds.
128 //
129 // The second argument to fn is the cluster ID of the remote backend,
130 // or "" for the local backend.
131 //
132 // A non-nil error means all backends failed.
133 func (conn *Conn) tryLocalThenRemotes(ctx context.Context, fn func(context.Context, string, backend) error) error {
134         if err := fn(ctx, "", conn.local); err == nil || errStatus(err) != http.StatusNotFound {
135                 return err
136         }
137
138         ctx, cancel := context.WithCancel(ctx)
139         defer cancel()
140         errchan := make(chan error, len(conn.remotes))
141         for remoteID, be := range conn.remotes {
142                 remoteID, be := remoteID, be
143                 go func() {
144                         errchan <- fn(ctx, remoteID, be)
145                 }()
146         }
147         all404 := true
148         var errs []error
149         for i := 0; i < cap(errchan); i++ {
150                 err := <-errchan
151                 if err == nil {
152                         return nil
153                 }
154                 all404 = all404 && errStatus(err) == http.StatusNotFound
155                 errs = append(errs, err)
156         }
157         if all404 {
158                 return notFoundError{}
159         }
160         // FIXME: choose appropriate HTTP status
161         return fmt.Errorf("errors: %v", errs)
162 }
163
164 func (conn *Conn) CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error) {
165         return conn.chooseBackend(options.ClusterID).CollectionCreate(ctx, options)
166 }
167
168 func (conn *Conn) CollectionUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Collection, error) {
169         return conn.chooseBackend(options.UUID).CollectionUpdate(ctx, options)
170 }
171
172 func rewriteManifest(mt, remoteID string) string {
173         return regexp.MustCompile(` [0-9a-f]{32}\+[^ ]*`).ReplaceAllStringFunc(mt, func(tok string) string {
174                 return strings.Replace(tok, "+A", "+R"+remoteID+"-", -1)
175         })
176 }
177
178 // this could be in sdk/go/arvados
179 func portableDataHash(mt string) string {
180         h := md5.New()
181         blkRe := regexp.MustCompile(`^ [0-9a-f]{32}\+\d+`)
182         size := 0
183         _ = regexp.MustCompile(` ?[^ ]*`).ReplaceAllFunc([]byte(mt), func(tok []byte) []byte {
184                 if m := blkRe.Find(tok); m != nil {
185                         // write hash+size, ignore remaining block hints
186                         tok = m
187                 }
188                 n, err := h.Write(tok)
189                 if err != nil {
190                         panic(err)
191                 }
192                 size += n
193                 return nil
194         })
195         return fmt.Sprintf("%x+%d", h.Sum(nil), size)
196 }
197
198 func (conn *Conn) CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error) {
199         if len(options.UUID) == 27 {
200                 // UUID is really a UUID
201                 c, err := conn.chooseBackend(options.UUID).CollectionGet(ctx, options)
202                 if err == nil && options.UUID[:5] != conn.cluster.ClusterID {
203                         c.ManifestText = rewriteManifest(c.ManifestText, options.UUID[:5])
204                 }
205                 return c, err
206         } else {
207                 // UUID is a PDH
208                 first := make(chan arvados.Collection, 1)
209                 err := conn.tryLocalThenRemotes(ctx, func(ctx context.Context, remoteID string, be backend) error {
210                         c, err := be.CollectionGet(ctx, options)
211                         if err != nil {
212                                 return err
213                         }
214                         if pdh := portableDataHash(c.ManifestText); pdh != options.UUID {
215                                 ctxlog.FromContext(ctx).Warnf("bad portable data hash %q received from remote %q (expected %q)", pdh, remoteID, options.UUID)
216                                 return notFoundError{}
217                         }
218                         if remoteID != "" {
219                                 c.ManifestText = rewriteManifest(c.ManifestText, remoteID)
220                         }
221                         select {
222                         case first <- c:
223                                 return nil
224                         default:
225                                 // lost race, return value doesn't matter
226                                 return nil
227                         }
228                 })
229                 if err != nil {
230                         return arvados.Collection{}, err
231                 }
232                 return <-first, nil
233         }
234 }
235
236 func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
237         return conn.local.CollectionList(ctx, options)
238 }
239
240 func (conn *Conn) CollectionDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error) {
241         return conn.chooseBackend(options.UUID).CollectionDelete(ctx, options)
242 }
243
244 func (conn *Conn) ContainerCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Container, error) {
245         return conn.chooseBackend(options.ClusterID).ContainerCreate(ctx, options)
246 }
247
248 func (conn *Conn) ContainerUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Container, error) {
249         return conn.chooseBackend(options.UUID).ContainerUpdate(ctx, options)
250 }
251
252 func (conn *Conn) ContainerGet(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
253         return conn.chooseBackend(options.UUID).ContainerGet(ctx, options)
254 }
255
256 func (conn *Conn) ContainerList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerList, error) {
257         return conn.local.ContainerList(ctx, options)
258 }
259
260 func (conn *Conn) ContainerDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Container, error) {
261         return conn.chooseBackend(options.UUID).ContainerDelete(ctx, options)
262 }
263
264 func (conn *Conn) ContainerLock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
265         return conn.chooseBackend(options.UUID).ContainerLock(ctx, options)
266 }
267
268 func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
269         return conn.chooseBackend(options.UUID).ContainerUnlock(ctx, options)
270 }
271
272 func (conn *Conn) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
273         return conn.chooseBackend(options.ClusterID).SpecimenCreate(ctx, options)
274 }
275
276 func (conn *Conn) SpecimenUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Specimen, error) {
277         return conn.chooseBackend(options.UUID).SpecimenUpdate(ctx, options)
278 }
279
280 func (conn *Conn) SpecimenGet(ctx context.Context, options arvados.GetOptions) (arvados.Specimen, error) {
281         return conn.chooseBackend(options.UUID).SpecimenGet(ctx, options)
282 }
283
284 func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
285         return conn.local.SpecimenList(ctx, options)
286 }
287
288 func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Specimen, error) {
289         return conn.chooseBackend(options.UUID).SpecimenDelete(ctx, options)
290 }
291
292 func (conn *Conn) APIClientAuthorizationCurrent(ctx context.Context, options arvados.GetOptions) (arvados.APIClientAuthorization, error) {
293         return conn.chooseBackend(options.UUID).APIClientAuthorizationCurrent(ctx, options)
294 }
295
296 type backend interface{ Interface }
297
298 type notFoundError struct{}
299
300 func (notFoundError) HTTPStatus() int { return http.StatusNotFound }
301 func (notFoundError) Error() string   { return "not found" }
302
303 func errStatus(err error) int {
304         if httpErr, ok := err.(interface{ HTTPStatus() int }); ok {
305                 return httpErr.HTTPStatus()
306         } else {
307                 return http.StatusInternalServerError
308         }
309 }