]> git.arvados.org - arvados.git/blob - lib/controller/localdb/collection.go
22958: Add missing `become`
[arvados.git] / lib / controller / localdb / collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package localdb
6
7 import (
8         "context"
9         "fmt"
10         "net/http"
11         "os"
12         "sort"
13         "strings"
14         "time"
15
16         "git.arvados.org/arvados.git/lib/ctrlctx"
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19         "git.arvados.org/arvados.git/sdk/go/auth"
20         "git.arvados.org/arvados.git/sdk/go/httpserver"
21 )
22
23 // CollectionGet defers to railsProxy for everything except blob
24 // signatures.
25 func (conn *Conn) CollectionGet(ctx context.Context, opts arvados.GetOptions) (arvados.Collection, error) {
26         conn.logActivity(ctx)
27         if len(opts.Select) > 0 {
28                 // We need to know IsTrashed and TrashAt to implement
29                 // signing properly, even if the caller doesn't want
30                 // them.
31                 opts.Select = append([]string{"is_trashed", "trash_at"}, opts.Select...)
32         }
33         resp, err := conn.railsProxy.CollectionGet(ctx, opts)
34         if err != nil {
35                 return resp, err
36         }
37         conn.signCollection(ctx, &resp)
38         return resp, nil
39 }
40
41 // CollectionList defers to railsProxy for everything except blob
42 // signatures.
43 func (conn *Conn) CollectionList(ctx context.Context, opts arvados.ListOptions) (arvados.CollectionList, error) {
44         conn.logActivity(ctx)
45         if len(opts.Select) > 0 {
46                 // We need to know IsTrashed and TrashAt to implement
47                 // signing properly, even if the caller doesn't want
48                 // them.
49                 opts.Select = append([]string{"is_trashed", "trash_at"}, opts.Select...)
50         }
51         resp, err := conn.railsProxy.CollectionList(ctx, opts)
52         if err != nil {
53                 return resp, err
54         }
55         for i := range resp.Items {
56                 conn.signCollection(ctx, &resp.Items[i])
57         }
58         return resp, nil
59 }
60
61 // CollectionCreate defers to railsProxy for everything except blob
62 // signatures and vocabulary checking.
63 func (conn *Conn) CollectionCreate(ctx context.Context, opts arvados.CreateOptions) (arvados.Collection, error) {
64         conn.logActivity(ctx)
65         err := conn.checkProperties(ctx, opts.Attrs["properties"])
66         if err != nil {
67                 return arvados.Collection{}, err
68         }
69         if len(opts.Select) > 0 {
70                 // We need to know IsTrashed and TrashAt to implement
71                 // signing properly, even if the caller doesn't want
72                 // them.
73                 opts.Select = append([]string{"is_trashed", "trash_at"}, opts.Select...)
74         }
75         if opts.Attrs, err = conn.applyReplaceFilesOption(ctx, "", opts.Attrs, opts.ReplaceFiles); err != nil {
76                 return arvados.Collection{}, err
77         }
78         if opts.Attrs, err = conn.applyReplaceSegmentsOption(ctx, "", opts.Attrs, opts.ReplaceSegments); err != nil {
79                 return arvados.Collection{}, err
80         }
81         resp, err := conn.railsProxy.CollectionCreate(ctx, opts)
82         if err != nil {
83                 return resp, err
84         }
85         conn.signCollection(ctx, &resp)
86         return resp, nil
87 }
88
89 // CollectionUpdate defers to railsProxy for everything except blob
90 // signatures and vocabulary checking.
91 func (conn *Conn) CollectionUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.Collection, error) {
92         conn.logActivity(ctx)
93         err := conn.checkProperties(ctx, opts.Attrs["properties"])
94         if err != nil {
95                 return arvados.Collection{}, err
96         }
97         if len(opts.Select) > 0 {
98                 // We need to know IsTrashed and TrashAt to implement
99                 // signing properly, even if the caller doesn't want
100                 // them.
101                 opts.Select = append([]string{"is_trashed", "trash_at"}, opts.Select...)
102         }
103         err = conn.lockUUID(ctx, opts.UUID)
104         if err != nil {
105                 return arvados.Collection{}, err
106         }
107         if opts.Attrs, err = conn.applyReplaceFilesOption(ctx, opts.UUID, opts.Attrs, opts.ReplaceFiles); err != nil {
108                 return arvados.Collection{}, err
109         }
110         if opts.Attrs, err = conn.applyReplaceSegmentsOption(ctx, opts.UUID, opts.Attrs, opts.ReplaceSegments); err != nil {
111                 return arvados.Collection{}, err
112         }
113         resp, err := conn.railsProxy.CollectionUpdate(ctx, opts)
114         if err != nil {
115                 return resp, err
116         }
117         conn.signCollection(ctx, &resp)
118         return resp, nil
119 }
120
121 func (conn *Conn) signCollection(ctx context.Context, coll *arvados.Collection) {
122         if coll.IsTrashed || coll.ManifestText == "" || !conn.cluster.Collections.BlobSigning {
123                 return
124         }
125         var token string
126         if creds, ok := auth.FromContext(ctx); ok && len(creds.Tokens) > 0 {
127                 token = creds.Tokens[0]
128         }
129         if token == "" {
130                 return
131         }
132         ttl := conn.cluster.Collections.BlobSigningTTL.Duration()
133         exp := time.Now().Add(ttl)
134         if coll.TrashAt != nil && !coll.TrashAt.IsZero() && coll.TrashAt.Before(exp) {
135                 exp = *coll.TrashAt
136         }
137         coll.ManifestText = arvados.SignManifest(coll.ManifestText, token, exp, ttl, []byte(conn.cluster.Collections.BlobSigningKey))
138 }
139
140 func (conn *Conn) lockUUID(ctx context.Context, uuid string) error {
141         tx, err := ctrlctx.CurrentTx(ctx)
142         if err != nil {
143                 return err
144         }
145         _, err = tx.ExecContext(ctx, `insert into uuid_locks (uuid) values ($1) on conflict (uuid) do update set n=uuid_locks.n+1`, uuid)
146         if err != nil {
147                 return err
148         }
149         return nil
150 }
151
152 // If replaceFiles is non-empty, populate attrs["manifest_text"] by
153 // starting with the content of fromUUID (or an empty collection if
154 // fromUUID is empty) and applying the specified file/directory
155 // replacements.
156 //
157 // Return value is the (possibly modified) attrs map.
158 func (conn *Conn) applyReplaceFilesOption(ctx context.Context, fromUUID string, attrs map[string]interface{}, replaceFiles map[string]string) (map[string]interface{}, error) {
159         if len(replaceFiles) == 0 {
160                 return attrs, nil
161         }
162
163         providedManifestText, _ := attrs["manifest_text"].(string)
164         if providedManifestText != "" {
165                 used := false
166                 for _, src := range replaceFiles {
167                         if strings.HasPrefix(src, "manifest_text/") {
168                                 used = true
169                                 break
170                         }
171                 }
172                 if !used {
173                         return nil, httpserver.Errorf(http.StatusBadRequest, "invalid request: attrs['manifest_text'] was provided, but would not be used because it is not referenced by any 'replace_files' entry")
174                 }
175         }
176
177         // Load the current collection (if any) and set up an
178         // in-memory filesystem.
179         var dst arvados.Collection
180         if _, replacingRoot := replaceFiles["/"]; !replacingRoot && fromUUID != "" {
181                 src, err := conn.CollectionGet(ctx, arvados.GetOptions{UUID: fromUUID})
182                 if err != nil {
183                         return nil, err
184                 }
185                 dst = src
186         }
187         dstfs, err := dst.FileSystem(&arvados.StubClient{}, &arvados.StubClient{})
188         if err != nil {
189                 return nil, err
190         }
191
192         // Sort replacements by source collection to avoid redundant
193         // reloads when a source collection is used more than
194         // once. Note empty sources (which mean "delete target path")
195         // sort first.
196         dstTodo := make([]string, 0, len(replaceFiles))
197         {
198                 srcid := make(map[string]string, len(replaceFiles))
199                 for dst, src := range replaceFiles {
200                         dstTodo = append(dstTodo, dst)
201                         if i := strings.IndexRune(src, '/'); i > 0 {
202                                 srcid[dst] = src[:i]
203                         }
204                 }
205                 sort.Slice(dstTodo, func(i, j int) bool {
206                         return srcid[dstTodo[i]] < srcid[dstTodo[j]]
207                 })
208         }
209
210         // Reject attempt to replace a node as well as its descendant
211         // (e.g., a/ and a/b/), which is unsupported, except where the
212         // source for a/ is empty (i.e., delete).
213         for _, dst := range dstTodo {
214                 if dst != "/" && (strings.HasSuffix(dst, "/") ||
215                         strings.HasSuffix(dst, "/.") ||
216                         strings.HasSuffix(dst, "/..") ||
217                         strings.Contains(dst, "//") ||
218                         strings.Contains(dst, "/./") ||
219                         strings.Contains(dst, "/../") ||
220                         !strings.HasPrefix(dst, "/")) {
221                         return nil, httpserver.Errorf(http.StatusBadRequest, "invalid replace_files target: %q", dst)
222                 }
223                 for i := 0; i < len(dst)-1; i++ {
224                         if dst[i] != '/' {
225                                 continue
226                         }
227                         outerdst := dst[:i]
228                         if outerdst == "" {
229                                 outerdst = "/"
230                         }
231                         if outersrc := replaceFiles[outerdst]; outersrc != "" {
232                                 return nil, httpserver.Errorf(http.StatusBadRequest, "replace_files: cannot operate on target %q inside non-empty target %q", dst, outerdst)
233                         }
234                 }
235         }
236
237         current := make(map[string]*arvados.Subtree)
238         // Check whether any sources are "current/...", and if so,
239         // populate current with the relevant snapshot.  Doing this
240         // ahead of time, before making any modifications to dstfs
241         // below, ensures that even instructions like {/a: current/b,
242         // b: current/a} will be handled correctly.
243         for _, src := range replaceFiles {
244                 if strings.HasPrefix(src, "current/") && current[src] == nil {
245                         current[src], err = arvados.Snapshot(dstfs, src[8:])
246                         if os.IsNotExist(err) {
247                                 return nil, httpserver.Errorf(http.StatusBadRequest, "replace_files: nonexistent source %q", src)
248                         } else if err != nil {
249                                 return nil, fmt.Errorf("%s: %w", src, err)
250                         }
251                 }
252         }
253
254         var srcidloaded string
255         var srcfs arvados.FileSystem
256         // Apply the requested replacements.
257         for _, dst := range dstTodo {
258                 src := replaceFiles[dst]
259                 if src == "" {
260                         if dst == "/" {
261                                 // In this case we started with a
262                                 // blank manifest, so there can't be
263                                 // anything to delete.
264                                 continue
265                         }
266                         err := dstfs.RemoveAll(dst)
267                         if err != nil {
268                                 return nil, fmt.Errorf("RemoveAll(%s): %w", dst, err)
269                         }
270                         continue
271                 }
272                 var snap *arvados.Subtree
273                 srcspec := strings.SplitN(src, "/", 2)
274                 srcid, srcpath := srcspec[0], "/"
275                 if len(srcspec) == 2 && srcspec[1] != "" {
276                         srcpath = srcspec[1]
277                 }
278                 switch {
279                 case srcid == "current":
280                         snap = current[src]
281                         if snap == nil {
282                                 return nil, fmt.Errorf("internal error: current[%s] == nil", src)
283                         }
284                 case srcid == "manifest_text":
285                         if srcidloaded == srcid {
286                                 break
287                         }
288                         srcfs = nil
289                         srccoll := &arvados.Collection{ManifestText: providedManifestText}
290                         srcfs, err = srccoll.FileSystem(&arvados.StubClient{}, &arvados.StubClient{})
291                         if err != nil {
292                                 return nil, err
293                         }
294                         srcidloaded = srcid
295                 case arvadosclient.PDHMatch(srcid):
296                         if srcidloaded == srcid {
297                                 break
298                         }
299                         srcfs = nil
300                         srccoll, err := conn.CollectionGet(ctx, arvados.GetOptions{UUID: srcid})
301                         if err != nil {
302                                 return nil, err
303                         }
304                         // We use StubClient here because we don't
305                         // want srcfs to read/write any file data or
306                         // sync collection state to/from the database.
307                         srcfs, err = srccoll.FileSystem(&arvados.StubClient{}, &arvados.StubClient{})
308                         if err != nil {
309                                 return nil, err
310                         }
311                         srcidloaded = srcid
312                 default:
313                         return nil, httpserver.Errorf(http.StatusBadRequest, "invalid source %q for replace_files[%q]: must be \"\" or \"SRC\" or \"SRC/path\" where SRC is \"current\", \"manifest_text\", or a portable data hash", src, dst)
314                 }
315                 if snap == nil {
316                         snap, err = arvados.Snapshot(srcfs, srcpath)
317                         if err != nil {
318                                 return nil, httpserver.Errorf(http.StatusBadRequest, "error getting snapshot of %q from %q: %w", srcpath, srcid, err)
319                         }
320                 }
321                 // Create intermediate dirs, in case dst is
322                 // "newdir1/newdir2/dst".
323                 for i := 1; i < len(dst)-1; i++ {
324                         if dst[i] == '/' {
325                                 err = dstfs.Mkdir(dst[:i], 0777)
326                                 if err != nil && !os.IsExist(err) {
327                                         return nil, httpserver.Errorf(http.StatusBadRequest, "error creating parent dirs for %q: %w", dst, err)
328                                 }
329                         }
330                 }
331                 err = arvados.Splice(dstfs, dst, snap)
332                 if err != nil {
333                         return nil, fmt.Errorf("error splicing snapshot onto path %q: %w", dst, err)
334                 }
335         }
336         mtxt, err := dstfs.MarshalManifest(".")
337         if err != nil {
338                 return nil, err
339         }
340         if attrs == nil {
341                 attrs = make(map[string]interface{}, 1)
342         }
343         attrs["manifest_text"] = mtxt
344         return attrs, nil
345 }
346
347 func (conn *Conn) applyReplaceSegmentsOption(ctx context.Context, fromUUID string, attrs map[string]interface{}, replaceSegments map[arvados.BlockSegment]arvados.BlockSegment) (map[string]interface{}, error) {
348         if len(replaceSegments) == 0 {
349                 return attrs, nil
350         }
351
352         // Load the current collection content (unless it's being
353         // replaced by the provided manifest_text).
354         var dst arvados.Collection
355         if txt, ok := attrs["manifest_text"].(string); ok {
356                 dst.ManifestText = txt
357         } else if fromUUID != "" {
358                 src, err := conn.CollectionGet(ctx, arvados.GetOptions{UUID: fromUUID})
359                 if err != nil {
360                         return nil, err
361                 }
362                 dst = src
363         }
364         dstfs, err := dst.FileSystem(&arvados.StubClient{}, &arvados.StubClient{})
365         if err != nil {
366                 return nil, err
367         }
368         if changed, err := dstfs.ReplaceSegments(replaceSegments); err != nil {
369                 return nil, httpserver.Errorf(http.StatusBadRequest, "replace_segments: %s", err)
370         } else if changed {
371                 txt, err := dstfs.MarshalManifest(".")
372                 if err != nil {
373                         return nil, err
374                 }
375                 if attrs == nil {
376                         attrs = make(map[string]interface{})
377                 }
378                 attrs["manifest_text"] = txt
379         }
380         return attrs, nil
381 }