14287: Ensure timestamps in responses have 9 digits of nanoseconds.
[arvados.git] / lib / controller / federation / conn.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package federation
6
7 import (
8         "context"
9         "crypto/md5"
10         "errors"
11         "fmt"
12         "net/http"
13         "net/url"
14         "regexp"
15         "strings"
16
17         "git.curoverse.com/arvados.git/lib/controller/railsproxy"
18         "git.curoverse.com/arvados.git/lib/controller/rpc"
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "git.curoverse.com/arvados.git/sdk/go/auth"
21         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
22 )
23
24 type Interface interface {
25         CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error)
26         CollectionUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Collection, error)
27         CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error)
28         CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error)
29         CollectionDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error)
30         ContainerCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Container, error)
31         ContainerUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Container, error)
32         ContainerGet(ctx context.Context, options arvados.GetOptions) (arvados.Container, error)
33         ContainerList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerList, error)
34         ContainerDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Container, error)
35         ContainerLock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error)
36         ContainerUnlock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error)
37         SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error)
38         SpecimenUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Specimen, error)
39         SpecimenGet(ctx context.Context, options arvados.GetOptions) (arvados.Specimen, error)
40         SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error)
41         SpecimenDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Specimen, error)
42         APIClientAuthorizationCurrent(ctx context.Context, options arvados.GetOptions) (arvados.APIClientAuthorization, error)
43 }
44
45 type Conn struct {
46         cluster *arvados.Cluster
47         local   backend
48         remotes map[string]backend
49 }
50
51 func New(cluster *arvados.Cluster) Interface {
52         local := railsproxy.NewConn(cluster)
53         remotes := map[string]backend{}
54         for id, remote := range cluster.RemoteClusters {
55                 if !remote.Proxy {
56                         continue
57                 }
58                 remotes[id] = rpc.NewConn(id, &url.URL{Scheme: remote.Scheme, Host: remote.Host}, remote.Insecure, saltedTokenProvider(local, id))
59         }
60
61         return &Conn{
62                 cluster: cluster,
63                 local:   local,
64                 remotes: remotes,
65         }
66 }
67
68 // Return a new rpc.TokenProvider that takes the client-provided
69 // tokens from an incoming request context, determines whether they
70 // should (and can) be salted for the given remoteID, and returns the
71 // resulting tokens.
72 func saltedTokenProvider(local backend, remoteID string) rpc.TokenProvider {
73         return func(ctx context.Context) ([]string, error) {
74                 var tokens []string
75                 incoming, ok := ctx.Value(auth.ContextKeyCredentials).(*auth.Credentials)
76                 if !ok {
77                         return nil, errors.New("no token provided")
78                 }
79                 for _, token := range incoming.Tokens {
80                         salted, err := auth.SaltToken(token, remoteID)
81                         switch err {
82                         case nil:
83                                 tokens = append(tokens, salted)
84                         case auth.ErrSalted:
85                                 tokens = append(tokens, token)
86                         case auth.ErrObsoleteToken:
87                                 ctx := context.WithValue(ctx, auth.ContextKeyCredentials, &auth.Credentials{Tokens: []string{token}})
88                                 aca, err := local.APIClientAuthorizationCurrent(ctx, arvados.GetOptions{})
89                                 if errStatus(err) == http.StatusUnauthorized {
90                                         // pass through unmodified
91                                         tokens = append(tokens, token)
92                                         continue
93                                 } else if err != nil {
94                                         return nil, err
95                                 }
96                                 salted, err := auth.SaltToken(aca.TokenV2(), remoteID)
97                                 if err != nil {
98                                         return nil, err
99                                 }
100                                 tokens = append(tokens, salted)
101                         default:
102                                 return nil, err
103                         }
104                 }
105                 return tokens, nil
106         }
107 }
108
109 // Return suitable backend for a query about the given cluster ID
110 // ("aaaaa") or object UUID ("aaaaa-dz642-abcdefghijklmno").
111 func (conn *Conn) chooseBackend(id string) backend {
112         if len(id) > 5 {
113                 id = id[:5]
114         }
115         if id == conn.cluster.ClusterID {
116                 return conn.local
117         } else if be, ok := conn.remotes[id]; ok {
118                 return be
119         } else {
120                 // TODO: return an "always error" backend?
121                 return conn.local
122         }
123 }
124
125 // Call fn with the local backend; then, if fn returned 404, call fn
126 // on the available remote backends (possibly concurrently) until one
127 // succeeds.
128 //
129 // The second argument to fn is the cluster ID of the remote backend,
130 // or "" for the local backend.
131 //
132 // A non-nil error means all backends failed.
133 func (conn *Conn) tryLocalThenRemotes(ctx context.Context, fn func(context.Context, string, backend) error) error {
134         if err := fn(ctx, "", conn.local); err == nil || errStatus(err) != http.StatusNotFound {
135                 return err
136         }
137
138         ctx, cancel := context.WithCancel(ctx)
139         defer cancel()
140         errchan := make(chan error, len(conn.remotes))
141         for remoteID, be := range conn.remotes {
142                 remoteID, be := remoteID, be
143                 go func() {
144                         errchan <- fn(ctx, remoteID, be)
145                 }()
146         }
147         all404 := true
148         var errs []error
149         for i := 0; i < cap(errchan); i++ {
150                 err := <-errchan
151                 if err == nil {
152                         return nil
153                 }
154                 all404 = all404 && errStatus(err) == http.StatusNotFound
155                 errs = append(errs, err)
156         }
157         if all404 {
158                 return notFoundError{}
159         }
160         // FIXME: choose appropriate HTTP status
161         return fmt.Errorf("errors: %v", errs)
162 }
163
164 func (conn *Conn) CollectionCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Collection, error) {
165         return conn.chooseBackend(options.ClusterID).CollectionCreate(ctx, options)
166 }
167
168 func (conn *Conn) CollectionUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Collection, error) {
169         return conn.chooseBackend(options.UUID).CollectionUpdate(ctx, options)
170 }
171
172 func rewriteManifest(mt, remoteID string) string {
173         return regexp.MustCompile(` [0-9a-f]{32}\+[^ ]*`).ReplaceAllStringFunc(mt, func(tok string) string {
174                 return strings.Replace(tok, "+A", "+R"+remoteID+"-", -1)
175         })
176 }
177
178 // this could be in sdk/go/arvados
179 func portableDataHash(mt string) string {
180         h := md5.New()
181         blkRe := regexp.MustCompile(`^ [0-9a-f]{32}\+\d+`)
182         size := 0
183         _ = regexp.MustCompile(` ?[^ ]*`).ReplaceAllFunc([]byte(mt), func(tok []byte) []byte {
184                 if m := blkRe.Find(tok); m != nil {
185                         // write hash+size, ignore remaining block hints
186                         tok = m
187                 }
188                 n, err := h.Write(tok)
189                 if err != nil {
190                         panic(err)
191                 }
192                 size += n
193                 return nil
194         })
195         return fmt.Sprintf("%x+%d", h.Sum(nil), size)
196 }
197
198 func (conn *Conn) CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error) {
199         if len(options.UUID) == 27 {
200                 // UUID is really a UUID
201                 c, err := conn.chooseBackend(options.UUID).CollectionGet(ctx, options)
202                 if err == nil && options.UUID[:5] != conn.cluster.ClusterID {
203                         c.ManifestText = rewriteManifest(c.ManifestText, options.UUID[:5])
204                 }
205                 return c, err
206         } else {
207                 // UUID is a PDH
208                 first := make(chan arvados.Collection, 1)
209                 err := conn.tryLocalThenRemotes(ctx, func(ctx context.Context, remoteID string, be backend) error {
210                         c, err := be.CollectionGet(ctx, options)
211                         if err != nil {
212                                 return err
213                         }
214                         // options.UUID is either hash+size or
215                         // hash+size+hints; only hash+size need to
216                         // match the computed PDH.
217                         if pdh := portableDataHash(c.ManifestText); pdh != options.UUID && !strings.HasPrefix(options.UUID, pdh+"+") {
218                                 ctxlog.FromContext(ctx).Warnf("bad portable data hash %q received from remote %q (expected %q)", pdh, remoteID, options.UUID)
219                                 return notFoundError{}
220                         }
221                         if remoteID != "" {
222                                 c.ManifestText = rewriteManifest(c.ManifestText, remoteID)
223                         }
224                         select {
225                         case first <- c:
226                                 return nil
227                         default:
228                                 // lost race, return value doesn't matter
229                                 return nil
230                         }
231                 })
232                 if err != nil {
233                         return arvados.Collection{}, err
234                 }
235                 return <-first, nil
236         }
237 }
238
239 func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
240         return conn.local.CollectionList(ctx, options)
241 }
242
243 func (conn *Conn) CollectionDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Collection, error) {
244         return conn.chooseBackend(options.UUID).CollectionDelete(ctx, options)
245 }
246
247 func (conn *Conn) ContainerCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Container, error) {
248         return conn.chooseBackend(options.ClusterID).ContainerCreate(ctx, options)
249 }
250
251 func (conn *Conn) ContainerUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Container, error) {
252         return conn.chooseBackend(options.UUID).ContainerUpdate(ctx, options)
253 }
254
255 func (conn *Conn) ContainerGet(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
256         return conn.chooseBackend(options.UUID).ContainerGet(ctx, options)
257 }
258
259 func (conn *Conn) ContainerList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerList, error) {
260         return conn.local.ContainerList(ctx, options)
261 }
262
263 func (conn *Conn) ContainerDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Container, error) {
264         return conn.chooseBackend(options.UUID).ContainerDelete(ctx, options)
265 }
266
267 func (conn *Conn) ContainerLock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
268         return conn.chooseBackend(options.UUID).ContainerLock(ctx, options)
269 }
270
271 func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOptions) (arvados.Container, error) {
272         return conn.chooseBackend(options.UUID).ContainerUnlock(ctx, options)
273 }
274
275 func (conn *Conn) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
276         return conn.chooseBackend(options.ClusterID).SpecimenCreate(ctx, options)
277 }
278
279 func (conn *Conn) SpecimenUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Specimen, error) {
280         return conn.chooseBackend(options.UUID).SpecimenUpdate(ctx, options)
281 }
282
283 func (conn *Conn) SpecimenGet(ctx context.Context, options arvados.GetOptions) (arvados.Specimen, error) {
284         return conn.chooseBackend(options.UUID).SpecimenGet(ctx, options)
285 }
286
287 func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
288         return conn.local.SpecimenList(ctx, options)
289 }
290
291 func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Specimen, error) {
292         return conn.chooseBackend(options.UUID).SpecimenDelete(ctx, options)
293 }
294
295 func (conn *Conn) APIClientAuthorizationCurrent(ctx context.Context, options arvados.GetOptions) (arvados.APIClientAuthorization, error) {
296         return conn.chooseBackend(options.UUID).APIClientAuthorizationCurrent(ctx, options)
297 }
298
299 type backend interface{ Interface }
300
301 type notFoundError struct{}
302
303 func (notFoundError) HTTPStatus() int { return http.StatusNotFound }
304 func (notFoundError) Error() string   { return "not found" }
305
306 func errStatus(err error) int {
307         if httpErr, ok := err.(interface{ HTTPStatus() int }); ok {
308                 return httpErr.HTTPStatus()
309         } else {
310                 return http.StatusInternalServerError
311         }
312 }