Merge branch '12684-pysdk-auto-retry'
[arvados.git] / lib / controller / localdb / collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package localdb
6
7 import (
8         "context"
9         "fmt"
10         "net/http"
11         "os"
12         "sort"
13         "strings"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
18         "git.arvados.org/arvados.git/sdk/go/auth"
19         "git.arvados.org/arvados.git/sdk/go/httpserver"
20 )
21
22 // CollectionGet defers to railsProxy for everything except blob
23 // signatures.
24 func (conn *Conn) CollectionGet(ctx context.Context, opts arvados.GetOptions) (arvados.Collection, error) {
25         conn.logActivity(ctx)
26         if len(opts.Select) > 0 {
27                 // We need to know IsTrashed and TrashAt to implement
28                 // signing properly, even if the caller doesn't want
29                 // them.
30                 opts.Select = append([]string{"is_trashed", "trash_at"}, opts.Select...)
31         }
32         resp, err := conn.railsProxy.CollectionGet(ctx, opts)
33         if err != nil {
34                 return resp, err
35         }
36         conn.signCollection(ctx, &resp)
37         return resp, nil
38 }
39
40 // CollectionList defers to railsProxy for everything except blob
41 // signatures.
42 func (conn *Conn) CollectionList(ctx context.Context, opts arvados.ListOptions) (arvados.CollectionList, error) {
43         conn.logActivity(ctx)
44         if len(opts.Select) > 0 {
45                 // We need to know IsTrashed and TrashAt to implement
46                 // signing properly, even if the caller doesn't want
47                 // them.
48                 opts.Select = append([]string{"is_trashed", "trash_at"}, opts.Select...)
49         }
50         resp, err := conn.railsProxy.CollectionList(ctx, opts)
51         if err != nil {
52                 return resp, err
53         }
54         for i := range resp.Items {
55                 conn.signCollection(ctx, &resp.Items[i])
56         }
57         return resp, nil
58 }
59
60 // CollectionCreate defers to railsProxy for everything except blob
61 // signatures and vocabulary checking.
62 func (conn *Conn) CollectionCreate(ctx context.Context, opts arvados.CreateOptions) (arvados.Collection, error) {
63         conn.logActivity(ctx)
64         err := conn.checkProperties(ctx, opts.Attrs["properties"])
65         if err != nil {
66                 return arvados.Collection{}, err
67         }
68         if len(opts.Select) > 0 {
69                 // We need to know IsTrashed and TrashAt to implement
70                 // signing properly, even if the caller doesn't want
71                 // them.
72                 opts.Select = append([]string{"is_trashed", "trash_at"}, opts.Select...)
73         }
74         if opts.Attrs, err = conn.applyReplaceFilesOption(ctx, "", opts.Attrs, opts.ReplaceFiles); err != nil {
75                 return arvados.Collection{}, err
76         }
77         resp, err := conn.railsProxy.CollectionCreate(ctx, opts)
78         if err != nil {
79                 return resp, err
80         }
81         conn.signCollection(ctx, &resp)
82         return resp, nil
83 }
84
85 // CollectionUpdate defers to railsProxy for everything except blob
86 // signatures and vocabulary checking.
87 func (conn *Conn) CollectionUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.Collection, error) {
88         conn.logActivity(ctx)
89         err := conn.checkProperties(ctx, opts.Attrs["properties"])
90         if err != nil {
91                 return arvados.Collection{}, err
92         }
93         if len(opts.Select) > 0 {
94                 // We need to know IsTrashed and TrashAt to implement
95                 // signing properly, even if the caller doesn't want
96                 // them.
97                 opts.Select = append([]string{"is_trashed", "trash_at"}, opts.Select...)
98         }
99         if opts.Attrs, err = conn.applyReplaceFilesOption(ctx, opts.UUID, opts.Attrs, opts.ReplaceFiles); err != nil {
100                 return arvados.Collection{}, err
101         }
102         resp, err := conn.railsProxy.CollectionUpdate(ctx, opts)
103         if err != nil {
104                 return resp, err
105         }
106         conn.signCollection(ctx, &resp)
107         return resp, nil
108 }
109
110 func (conn *Conn) signCollection(ctx context.Context, coll *arvados.Collection) {
111         if coll.IsTrashed || coll.ManifestText == "" || !conn.cluster.Collections.BlobSigning {
112                 return
113         }
114         var token string
115         if creds, ok := auth.FromContext(ctx); ok && len(creds.Tokens) > 0 {
116                 token = creds.Tokens[0]
117         }
118         if token == "" {
119                 return
120         }
121         ttl := conn.cluster.Collections.BlobSigningTTL.Duration()
122         exp := time.Now().Add(ttl)
123         if coll.TrashAt != nil && !coll.TrashAt.IsZero() && coll.TrashAt.Before(exp) {
124                 exp = *coll.TrashAt
125         }
126         coll.ManifestText = arvados.SignManifest(coll.ManifestText, token, exp, ttl, []byte(conn.cluster.Collections.BlobSigningKey))
127 }
128
129 // If replaceFiles is non-empty, populate attrs["manifest_text"] by
130 // starting with the content of fromUUID (or an empty collection if
131 // fromUUID is empty) and applying the specified file/directory
132 // replacements.
133 //
134 // Return value is the (possibly modified) attrs map.
135 func (conn *Conn) applyReplaceFilesOption(ctx context.Context, fromUUID string, attrs map[string]interface{}, replaceFiles map[string]string) (map[string]interface{}, error) {
136         if len(replaceFiles) == 0 {
137                 return attrs, nil
138         } else if mtxt, ok := attrs["manifest_text"].(string); ok && len(mtxt) > 0 {
139                 return nil, httpserver.Errorf(http.StatusBadRequest, "ambiguous request: both 'replace_files' and attrs['manifest_text'] values provided")
140         }
141
142         // Load the current collection (if any) and set up an
143         // in-memory filesystem.
144         var dst arvados.Collection
145         if _, replacingRoot := replaceFiles["/"]; !replacingRoot && fromUUID != "" {
146                 src, err := conn.CollectionGet(ctx, arvados.GetOptions{UUID: fromUUID})
147                 if err != nil {
148                         return nil, err
149                 }
150                 dst = src
151         }
152         dstfs, err := dst.FileSystem(&arvados.StubClient{}, &arvados.StubClient{})
153         if err != nil {
154                 return nil, err
155         }
156
157         // Sort replacements by source collection to avoid redundant
158         // reloads when a source collection is used more than
159         // once. Note empty sources (which mean "delete target path")
160         // sort first.
161         dstTodo := make([]string, 0, len(replaceFiles))
162         {
163                 srcid := make(map[string]string, len(replaceFiles))
164                 for dst, src := range replaceFiles {
165                         dstTodo = append(dstTodo, dst)
166                         if i := strings.IndexRune(src, '/'); i > 0 {
167                                 srcid[dst] = src[:i]
168                         }
169                 }
170                 sort.Slice(dstTodo, func(i, j int) bool {
171                         return srcid[dstTodo[i]] < srcid[dstTodo[j]]
172                 })
173         }
174
175         // Reject attempt to replace a node as well as its descendant
176         // (e.g., a/ and a/b/), which is unsupported, except where the
177         // source for a/ is empty (i.e., delete).
178         for _, dst := range dstTodo {
179                 if dst != "/" && (strings.HasSuffix(dst, "/") ||
180                         strings.HasSuffix(dst, "/.") ||
181                         strings.HasSuffix(dst, "/..") ||
182                         strings.Contains(dst, "//") ||
183                         strings.Contains(dst, "/./") ||
184                         strings.Contains(dst, "/../") ||
185                         !strings.HasPrefix(dst, "/")) {
186                         return nil, httpserver.Errorf(http.StatusBadRequest, "invalid replace_files target: %q", dst)
187                 }
188                 for i := 0; i < len(dst)-1; i++ {
189                         if dst[i] != '/' {
190                                 continue
191                         }
192                         outerdst := dst[:i]
193                         if outerdst == "" {
194                                 outerdst = "/"
195                         }
196                         if outersrc := replaceFiles[outerdst]; outersrc != "" {
197                                 return nil, httpserver.Errorf(http.StatusBadRequest, "replace_files: cannot operate on target %q inside non-empty target %q", dst, outerdst)
198                         }
199                 }
200         }
201
202         var srcidloaded string
203         var srcfs arvados.FileSystem
204         // Apply the requested replacements.
205         for _, dst := range dstTodo {
206                 src := replaceFiles[dst]
207                 if src == "" {
208                         if dst == "/" {
209                                 // In this case we started with a
210                                 // blank manifest, so there can't be
211                                 // anything to delete.
212                                 continue
213                         }
214                         err := dstfs.RemoveAll(dst)
215                         if err != nil {
216                                 return nil, fmt.Errorf("RemoveAll(%s): %w", dst, err)
217                         }
218                         continue
219                 }
220                 srcspec := strings.SplitN(src, "/", 2)
221                 srcid, srcpath := srcspec[0], "/"
222                 if !arvadosclient.PDHMatch(srcid) {
223                         return nil, httpserver.Errorf(http.StatusBadRequest, "invalid source %q for replace_files[%q]: must be \"\" or \"PDH\" or \"PDH/path\"", src, dst)
224                 }
225                 if len(srcspec) == 2 && srcspec[1] != "" {
226                         srcpath = srcspec[1]
227                 }
228                 if srcidloaded != srcid {
229                         srcfs = nil
230                         srccoll, err := conn.CollectionGet(ctx, arvados.GetOptions{UUID: srcid})
231                         if err != nil {
232                                 return nil, err
233                         }
234                         // We use StubClient here because we don't
235                         // want srcfs to read/write any file data or
236                         // sync collection state to/from the database.
237                         srcfs, err = srccoll.FileSystem(&arvados.StubClient{}, &arvados.StubClient{})
238                         if err != nil {
239                                 return nil, err
240                         }
241                         srcidloaded = srcid
242                 }
243                 snap, err := arvados.Snapshot(srcfs, srcpath)
244                 if err != nil {
245                         return nil, httpserver.Errorf(http.StatusBadRequest, "error getting snapshot of %q from %q: %w", srcpath, srcid, err)
246                 }
247                 // Create intermediate dirs, in case dst is
248                 // "newdir1/newdir2/dst".
249                 for i := 1; i < len(dst)-1; i++ {
250                         if dst[i] == '/' {
251                                 err = dstfs.Mkdir(dst[:i], 0777)
252                                 if err != nil && !os.IsExist(err) {
253                                         return nil, httpserver.Errorf(http.StatusBadRequest, "error creating parent dirs for %q: %w", dst, err)
254                                 }
255                         }
256                 }
257                 err = arvados.Splice(dstfs, dst, snap)
258                 if err != nil {
259                         return nil, fmt.Errorf("error splicing snapshot onto path %q: %w", dst, err)
260                 }
261         }
262         mtxt, err := dstfs.MarshalManifest(".")
263         if err != nil {
264                 return nil, err
265         }
266         if attrs == nil {
267                 attrs = make(map[string]interface{}, 1)
268         }
269         attrs["manifest_text"] = mtxt
270         return attrs, nil
271 }