21701: Test replace_files using provided manifest_text as source.
[arvados.git] / lib / controller / localdb / collection_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package localdb
6
7 import (
8         "fmt"
9         "io/fs"
10         "path/filepath"
11         "regexp"
12         "strconv"
13         "strings"
14         "sync"
15         "sync/atomic"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/ctrlctx"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
21         "git.arvados.org/arvados.git/sdk/go/arvadostest"
22         "git.arvados.org/arvados.git/sdk/go/keepclient"
23         check "gopkg.in/check.v1"
24 )
25
26 var _ = check.Suite(&CollectionSuite{})
27
28 type CollectionSuite struct {
29         localdbSuite
30 }
31
32 func (s *CollectionSuite) TestCollectionCreateAndUpdateWithProperties(c *check.C) {
33         s.setUpVocabulary(c, "")
34
35         tests := []struct {
36                 name    string
37                 props   map[string]interface{}
38                 success bool
39         }{
40                 {"Invalid prop key", map[string]interface{}{"Priority": "IDVALIMPORTANCES1"}, false},
41                 {"Invalid prop value", map[string]interface{}{"IDTAGIMPORTANCES": "high"}, false},
42                 {"Valid prop key & value", map[string]interface{}{"IDTAGIMPORTANCES": "IDVALIMPORTANCES1"}, true},
43                 {"Empty properties", map[string]interface{}{}, true},
44         }
45         for _, tt := range tests {
46                 c.Log(c.TestName()+" ", tt.name)
47
48                 // Create with properties
49                 coll, err := s.localdb.CollectionCreate(s.userctx, arvados.CreateOptions{
50                         Select: []string{"uuid", "properties"},
51                         Attrs: map[string]interface{}{
52                                 "properties": tt.props,
53                         }})
54                 if tt.success {
55                         c.Assert(err, check.IsNil)
56                         c.Assert(coll.Properties, check.DeepEquals, tt.props)
57                 } else {
58                         c.Assert(err, check.NotNil)
59                 }
60
61                 // Create, then update with properties
62                 coll, err = s.localdb.CollectionCreate(s.userctx, arvados.CreateOptions{})
63                 c.Assert(err, check.IsNil)
64                 coll, err = s.localdb.CollectionUpdate(s.userctx, arvados.UpdateOptions{
65                         UUID:   coll.UUID,
66                         Select: []string{"uuid", "properties"},
67                         Attrs: map[string]interface{}{
68                                 "properties": tt.props,
69                         }})
70                 if tt.success {
71                         c.Assert(err, check.IsNil)
72                         c.Assert(coll.Properties, check.DeepEquals, tt.props)
73                 } else {
74                         c.Assert(err, check.NotNil)
75                 }
76         }
77 }
78
79 func (s *CollectionSuite) TestSignatures(c *check.C) {
80         resp, err := s.localdb.CollectionGet(s.userctx, arvados.GetOptions{UUID: arvadostest.FooCollection})
81         c.Check(err, check.IsNil)
82         c.Check(resp.ManifestText, check.Matches, `(?ms).* acbd[^ ]*\+3\+A[0-9a-f]+@[0-9a-f]+ 0:.*`)
83         s.checkSignatureExpiry(c, resp.ManifestText, time.Hour*24*7*2)
84
85         resp, err = s.localdb.CollectionGet(s.userctx, arvados.GetOptions{UUID: arvadostest.FooCollection, Select: []string{"manifest_text"}})
86         c.Check(err, check.IsNil)
87         c.Check(resp.ManifestText, check.Matches, `(?ms).* acbd[^ ]*\+3\+A[0-9a-f]+@[0-9a-f]+ 0:.*`)
88
89         lresp, err := s.localdb.CollectionList(s.userctx, arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", arvadostest.FooCollection}}})
90         c.Check(err, check.IsNil)
91         if c.Check(lresp.Items, check.HasLen, 1) {
92                 c.Check(lresp.Items[0].UUID, check.Equals, arvadostest.FooCollection)
93                 c.Check(lresp.Items[0].ManifestText, check.Equals, "")
94                 c.Check(lresp.Items[0].UnsignedManifestText, check.Equals, "")
95         }
96
97         lresp, err = s.localdb.CollectionList(s.userctx, arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", arvadostest.FooCollection}}, Select: []string{"manifest_text"}})
98         c.Check(err, check.IsNil)
99         if c.Check(lresp.Items, check.HasLen, 1) {
100                 c.Check(lresp.Items[0].ManifestText, check.Matches, `(?ms).* acbd[^ ]*\+3\+A[0-9a-f]+@[0-9a-f]+ 0:.*`)
101                 c.Check(lresp.Items[0].UnsignedManifestText, check.Equals, "")
102         }
103
104         lresp, err = s.localdb.CollectionList(s.userctx, arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", arvadostest.FooCollection}}, Select: []string{"unsigned_manifest_text"}})
105         c.Check(err, check.IsNil)
106         if c.Check(lresp.Items, check.HasLen, 1) {
107                 c.Check(lresp.Items[0].ManifestText, check.Equals, "")
108                 c.Check(lresp.Items[0].UnsignedManifestText, check.Matches, `(?ms).* acbd[^ ]*\+3 0:.*`)
109         }
110
111         // early trash date causes lower signature TTL (even if
112         // trash_at and is_trashed fields are unselected)
113         trashed, err := s.localdb.CollectionCreate(s.userctx, arvados.CreateOptions{
114                 Select: []string{"uuid", "manifest_text"},
115                 Attrs: map[string]interface{}{
116                         "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
117                         "trash_at":      time.Now().UTC().Add(time.Hour),
118                 }})
119         c.Assert(err, check.IsNil)
120         s.checkSignatureExpiry(c, trashed.ManifestText, time.Hour)
121         resp, err = s.localdb.CollectionGet(s.userctx, arvados.GetOptions{UUID: trashed.UUID})
122         c.Assert(err, check.IsNil)
123         s.checkSignatureExpiry(c, resp.ManifestText, time.Hour)
124
125         // distant future trash date does not cause higher signature TTL
126         trashed, err = s.localdb.CollectionUpdate(s.userctx, arvados.UpdateOptions{
127                 UUID: trashed.UUID,
128                 Attrs: map[string]interface{}{
129                         "trash_at": time.Now().UTC().Add(time.Hour * 24 * 365),
130                 }})
131         c.Assert(err, check.IsNil)
132         s.checkSignatureExpiry(c, trashed.ManifestText, time.Hour*24*7*2)
133         resp, err = s.localdb.CollectionGet(s.userctx, arvados.GetOptions{UUID: trashed.UUID})
134         c.Assert(err, check.IsNil)
135         s.checkSignatureExpiry(c, resp.ManifestText, time.Hour*24*7*2)
136
137         // Make sure groups/contents doesn't return manifest_text with
138         // collections (if it did, we'd need to sign it).
139         gresp, err := s.localdb.GroupContents(s.userctx, arvados.GroupContentsOptions{
140                 Limit:   -1,
141                 Filters: []arvados.Filter{{"uuid", "=", arvadostest.FooCollection}},
142                 Select:  []string{"uuid", "manifest_text"},
143         })
144         if err != nil {
145                 c.Check(err, check.ErrorMatches, `.*Invalid attribute.*manifest_text.*`)
146         } else if c.Check(gresp.Items, check.HasLen, 1) {
147                 c.Check(gresp.Items[0].(map[string]interface{})["uuid"], check.Equals, arvadostest.FooCollection)
148                 c.Check(gresp.Items[0].(map[string]interface{})["manifest_text"], check.Equals, nil)
149         }
150 }
151
152 func (s *CollectionSuite) checkSignatureExpiry(c *check.C, manifestText string, expectedTTL time.Duration) {
153         m := regexp.MustCompile(`@([[:xdigit:]]+)`).FindStringSubmatch(manifestText)
154         c.Assert(m, check.HasLen, 2)
155         sigexp, err := strconv.ParseInt(m[1], 16, 64)
156         c.Assert(err, check.IsNil)
157         expectedExp := time.Now().Add(expectedTTL).Unix()
158         c.Check(sigexp > expectedExp-60, check.Equals, true)
159         c.Check(sigexp <= expectedExp, check.Equals, true)
160 }
161
162 func (s *CollectionSuite) TestSignaturesDisabled(c *check.C) {
163         s.localdb.cluster.Collections.BlobSigning = false
164         resp, err := s.localdb.CollectionGet(s.userctx, arvados.GetOptions{UUID: arvadostest.FooCollection})
165         c.Check(err, check.IsNil)
166         c.Check(resp.ManifestText, check.Matches, `(?ms).* acbd[^ +]*\+3 0:.*`)
167 }
168
169 var _ = check.Suite(&replaceFilesSuite{})
170
171 type replaceFilesSuite struct {
172         CollectionSuite
173         client *arvados.Client
174         ac     *arvadosclient.ArvadosClient
175         kc     *keepclient.KeepClient
176         foo    arvados.Collection // contains /foo.txt
177         tmp    arvados.Collection // working collection, initially contains /foo.txt
178 }
179
180 func (s *replaceFilesSuite) SetUpSuite(c *check.C) {
181         s.CollectionSuite.SetUpSuite(c)
182         var err error
183         s.client = arvados.NewClientFromEnv()
184         s.ac, err = arvadosclient.New(s.client)
185         c.Assert(err, check.IsNil)
186         s.kc, err = keepclient.MakeKeepClient(s.ac)
187         c.Assert(err, check.IsNil)
188 }
189
190 func (s *replaceFilesSuite) SetUpTest(c *check.C) {
191         s.CollectionSuite.SetUpTest(c)
192         // Unlike most test suites, we need to COMMIT our setup --
193         // otherwise, when our tests start additional
194         // transactions/connections, they won't see our setup.
195         ctx, txFinish := ctrlctx.New(s.ctx, s.dbConnector.GetDB)
196         defer txFinish(new(error))
197         adminctx := ctrlctx.NewWithToken(ctx, s.cluster, arvadostest.AdminToken)
198         var err error
199         s.foo, err = s.localdb.railsProxy.CollectionCreate(adminctx, arvados.CreateOptions{
200                 Attrs: map[string]interface{}{
201                         "owner_uuid":    arvadostest.ActiveUserUUID,
202                         "manifest_text": ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n",
203                 }})
204         c.Assert(err, check.IsNil)
205         s.tmp, err = s.localdb.CollectionCreate(s.userctx, arvados.CreateOptions{
206                 ReplaceFiles: map[string]string{
207                         "/foo.txt": s.foo.PortableDataHash + "/foo.txt",
208                 },
209                 Attrs: map[string]interface{}{
210                         "owner_uuid": arvadostest.ActiveUserUUID,
211                 }})
212         c.Assert(err, check.IsNil)
213         s.expectFiles(c, s.tmp, "foo.txt")
214 }
215
216 func (s *replaceFilesSuite) TestCollectionReplaceFiles(c *check.C) {
217         adminctx := ctrlctx.NewWithToken(s.ctx, s.cluster, arvadostest.AdminToken)
218         foobarbaz, err := s.localdb.railsProxy.CollectionCreate(adminctx, arvados.CreateOptions{
219                 Attrs: map[string]interface{}{
220                         "owner_uuid":    arvadostest.ActiveUserUUID,
221                         "manifest_text": "./foo/bar 73feffa4b7f6bb68e44cf984c85f6e88+3 0:3:baz.txt\n",
222                 }})
223         c.Assert(err, check.IsNil)
224         wazqux, err := s.localdb.railsProxy.CollectionCreate(adminctx, arvados.CreateOptions{
225                 Attrs: map[string]interface{}{
226                         "owner_uuid":    arvadostest.ActiveUserUUID,
227                         "manifest_text": "./waz d85b1213473c2fd7c2045020a6b9c62b+3 0:3:qux.txt\n",
228                 }})
229         c.Assert(err, check.IsNil)
230
231         // Create using content from existing collections
232         dst, err := s.localdb.CollectionCreate(s.userctx, arvados.CreateOptions{
233                 ReplaceFiles: map[string]string{
234                         "/f": s.foo.PortableDataHash + "/foo.txt",
235                         "/b": foobarbaz.PortableDataHash + "/foo/bar",
236                         "/q": wazqux.PortableDataHash + "/",
237                         "/w": wazqux.PortableDataHash + "/waz",
238                 },
239                 Attrs: map[string]interface{}{
240                         "owner_uuid": arvadostest.ActiveUserUUID,
241                 }})
242         c.Assert(err, check.IsNil)
243         s.expectFiles(c, dst, "f", "b/baz.txt", "q/waz/qux.txt", "w/qux.txt")
244
245         // Delete a file and a directory
246         dst, err = s.localdb.CollectionUpdate(s.userctx, arvados.UpdateOptions{
247                 UUID: dst.UUID,
248                 ReplaceFiles: map[string]string{
249                         "/f":     "",
250                         "/q/waz": "",
251                 }})
252         c.Assert(err, check.IsNil)
253         s.expectFiles(c, dst, "b/baz.txt", "q/", "w/qux.txt")
254
255         // Move and copy content within collection
256         dst, err = s.localdb.CollectionUpdate(s.userctx, arvados.UpdateOptions{
257                 UUID: dst.UUID,
258                 ReplaceFiles: map[string]string{
259                         // Note splicing content to /b/corge.txt but
260                         // removing everything else from /b
261                         "/b":              "",
262                         "/b/corge.txt":    dst.PortableDataHash + "/b/baz.txt",
263                         "/quux/corge.txt": dst.PortableDataHash + "/b/baz.txt",
264                 }})
265         c.Assert(err, check.IsNil)
266         s.expectFiles(c, dst, "b/corge.txt", "q/", "w/qux.txt", "quux/corge.txt")
267
268         // Remove everything except one file
269         dst, err = s.localdb.CollectionUpdate(s.userctx, arvados.UpdateOptions{
270                 UUID: dst.UUID,
271                 ReplaceFiles: map[string]string{
272                         "/":            "",
273                         "/b/corge.txt": dst.PortableDataHash + "/b/corge.txt",
274                 }})
275         c.Assert(err, check.IsNil)
276         s.expectFiles(c, dst, "b/corge.txt")
277
278         // Copy entire collection to root
279         dstcopy, err := s.localdb.CollectionCreate(s.userctx, arvados.CreateOptions{
280                 ReplaceFiles: map[string]string{
281                         "/": dst.PortableDataHash,
282                 }})
283         c.Check(err, check.IsNil)
284         c.Check(dstcopy.PortableDataHash, check.Equals, dst.PortableDataHash)
285         s.expectFiles(c, dstcopy, "b/corge.txt")
286
287         // Check invalid targets, sources, and combinations
288         for _, badrepl := range []map[string]string{
289                 {
290                         "/foo/nope": dst.PortableDataHash + "/b",
291                         "/foo":      dst.PortableDataHash + "/b",
292                 },
293                 {
294                         "/foo":      dst.PortableDataHash + "/b",
295                         "/foo/nope": "",
296                 },
297                 {
298                         "/":     dst.PortableDataHash + "/",
299                         "/nope": "",
300                 },
301                 {
302                         "/":     dst.PortableDataHash + "/",
303                         "/nope": dst.PortableDataHash + "/b",
304                 },
305                 {"/bad/": ""},
306                 {"/./bad": ""},
307                 {"/b/./ad": ""},
308                 {"/b/../ad": ""},
309                 {"/b/.": ""},
310                 {".": ""},
311                 {"bad": ""},
312                 {"": ""},
313                 {"/bad": "/b"},
314                 {"/bad": "bad/b"},
315                 {"/bad": dst.UUID + "/b"},
316         } {
317                 _, err = s.localdb.CollectionUpdate(s.userctx, arvados.UpdateOptions{
318                         UUID:         dst.UUID,
319                         ReplaceFiles: badrepl,
320                 })
321                 c.Logf("badrepl %#v\n... got err: %s", badrepl, err)
322                 c.Check(err, check.NotNil)
323         }
324 }
325
326 func (s *replaceFilesSuite) TestMultipleRename(c *check.C) {
327         adminctx := ctrlctx.NewWithToken(s.ctx, s.cluster, arvadostest.AdminToken)
328         tmp, err := s.localdb.CollectionUpdate(adminctx, arvados.UpdateOptions{
329                 UUID: s.tmp.UUID,
330                 Attrs: map[string]interface{}{
331                         "manifest_text": ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:1:file1 0:2:file2 0:3:file3\n"}})
332         c.Assert(err, check.IsNil)
333         tmp, err = s.localdb.CollectionUpdate(s.userctx, arvados.UpdateOptions{
334                 UUID: tmp.UUID,
335                 ReplaceFiles: map[string]string{
336                         "/file1":     "current/file2",
337                         "/file2":     "current/file3",
338                         "/file3":     "current/file1",
339                         "/dir/file1": "current/file1",
340                 }})
341         c.Check(err, check.IsNil)
342         s.expectFileSizes(c, tmp, map[string]int64{
343                 "file1":     2,
344                 "file2":     3,
345                 "file3":     1,
346                 "dir/file1": 1,
347         })
348 }
349
350 func (s *replaceFilesSuite) TestConcurrentCopyFromPDH(c *check.C) {
351         var wg sync.WaitGroup
352         var expectFiles []string
353         for i := 0; i < 10; i++ {
354                 fnm := fmt.Sprintf("copy%d.txt", i)
355                 expectFiles = append(expectFiles, fnm)
356                 wg.Add(1)
357                 go func() {
358                         defer wg.Done()
359                         ctx, txFinish := ctrlctx.New(s.ctx, s.dbConnector.GetDB)
360                         defer txFinish(new(error))
361                         userctx := ctrlctx.NewWithToken(ctx, s.cluster, arvadostest.ActiveTokenV2)
362                         _, err := s.localdb.CollectionUpdate(userctx, arvados.UpdateOptions{
363                                 UUID: s.tmp.UUID,
364                                 ReplaceFiles: map[string]string{
365                                         "/" + fnm:  s.foo.PortableDataHash + "/foo.txt",
366                                         "/foo.txt": "",
367                                 }})
368                         c.Check(err, check.IsNil)
369                 }()
370         }
371         wg.Wait()
372         // After N concurrent/overlapping requests to add different
373         // files by copying from another collection, we should see all
374         // N files.
375         final, err := s.localdb.CollectionGet(s.userctx, arvados.GetOptions{UUID: s.tmp.UUID})
376         c.Assert(err, check.IsNil)
377         s.expectFiles(c, final, expectFiles...)
378 }
379
380 func (s *replaceFilesSuite) TestConcurrentCopyFromProvidedManifestText(c *check.C) {
381         blockLocator := strings.Split(s.tmp.ManifestText, " ")[1]
382         var wg sync.WaitGroup
383         expectFileSizes := make(map[string]int64)
384         for i := 0; i < 10; i++ {
385                 fnm := fmt.Sprintf("upload%d.txt", i)
386                 expectFileSizes[fnm] = 2
387                 wg.Add(1)
388                 go func() {
389                         defer wg.Done()
390                         ctx, txFinish := ctrlctx.New(s.ctx, s.dbConnector.GetDB)
391                         defer txFinish(new(error))
392                         userctx := ctrlctx.NewWithToken(ctx, s.cluster, arvadostest.ActiveTokenV2)
393                         _, err := s.localdb.CollectionUpdate(userctx, arvados.UpdateOptions{
394                                 UUID: s.tmp.UUID,
395                                 Attrs: map[string]interface{}{
396                                         "manifest_text": ". " + blockLocator + " 0:2:" + fnm + "\n",
397                                 },
398                                 ReplaceFiles: map[string]string{
399                                         "/" + fnm:  "manifest_text/" + fnm,
400                                         "/foo.txt": "",
401                                 }})
402                         c.Check(err, check.IsNil)
403                 }()
404         }
405         wg.Wait()
406         // After N concurrent/overlapping requests to add different
407         // files, we should see all N files.
408         final, err := s.localdb.CollectionGet(s.userctx, arvados.GetOptions{UUID: s.tmp.UUID})
409         c.Assert(err, check.IsNil)
410         s.expectFileSizes(c, final, expectFileSizes)
411 }
412
413 func (s *replaceFilesSuite) TestConcurrentRename(c *check.C) {
414         var wg sync.WaitGroup
415         var renamed atomic.Int32
416         n := 10
417         errors := make(chan error, n)
418         var newnameOK string
419         for i := 0; i < n; i++ {
420                 newname := fmt.Sprintf("newname%d.txt", i)
421                 wg.Add(1)
422                 go func() {
423                         defer wg.Done()
424                         ctx, txFinish := ctrlctx.New(s.ctx, s.dbConnector.GetDB)
425                         defer txFinish(new(error))
426                         userctx := ctrlctx.NewWithToken(ctx, s.cluster, arvadostest.ActiveTokenV2)
427                         upd, err := s.localdb.CollectionUpdate(userctx, arvados.UpdateOptions{
428                                 UUID: s.tmp.UUID,
429                                 ReplaceFiles: map[string]string{
430                                         "/" + newname: "current/foo.txt",
431                                         "/foo.txt":    "",
432                                 }})
433                         if err != nil {
434                                 errors <- err
435                         } else {
436                                 renamed.Add(1)
437                                 s.expectFiles(c, upd, newname)
438                                 newnameOK = newname
439                         }
440                 }()
441         }
442         wg.Wait()
443         // N concurrent/overlapping attempts to rename foo.txt should
444         // have succeed exactly one time, and the final collection
445         // content should correspond to the operation that returned
446         // success.
447         if !c.Check(int(renamed.Load()), check.Equals, 1) {
448                 close(errors)
449                 for err := range errors {
450                         c.Logf("err: %s", err)
451                 }
452                 return
453         }
454         c.Assert(newnameOK, check.Not(check.Equals), "")
455         final, err := s.localdb.CollectionGet(s.userctx, arvados.GetOptions{UUID: s.tmp.UUID})
456         c.Assert(err, check.IsNil)
457         s.expectFiles(c, final, newnameOK)
458 }
459
460 // expectFiles checks coll's directory structure against the given
461 // list of expected files and empty directories. An expected path with
462 // a trailing slash indicates an empty directory.
463 func (s *replaceFilesSuite) expectFiles(c *check.C, coll arvados.Collection, expected ...string) {
464         expectSizes := make(map[string]int64)
465         for _, path := range expected {
466                 expectSizes[path] = -1
467         }
468         s.expectFileSizes(c, coll, expectSizes)
469 }
470
471 // expectFiles checks coll's directory structure against the given map
472 // of path->size.  An expected path with a trailing slash indicates an
473 // empty directory.  An expect size of -1 indicates the expected file
474 // size does not need to be checked.
475 func (s *replaceFilesSuite) expectFileSizes(c *check.C, coll arvados.Collection, expected map[string]int64) {
476         cfs, err := coll.FileSystem(s.client, s.kc)
477         c.Assert(err, check.IsNil)
478         found := make(map[string]int64)
479         nonemptydirs := map[string]bool{}
480         fs.WalkDir(arvados.FS(cfs), "/", func(path string, d fs.DirEntry, err error) error {
481                 dir, _ := filepath.Split(path)
482                 nonemptydirs[dir] = true
483                 if d.IsDir() {
484                         if path != "/" {
485                                 path += "/"
486                         }
487                         if !nonemptydirs[path] {
488                                 nonemptydirs[path] = false
489                         }
490                 } else {
491                         fi, err := d.Info()
492                         c.Assert(err, check.IsNil)
493                         found[path] = fi.Size()
494                 }
495                 return nil
496         })
497         for d, nonempty := range nonemptydirs {
498                 if !nonempty {
499                         found[d] = 0
500                 }
501         }
502         for path, size := range found {
503                 if trimmed := strings.TrimPrefix(path, "/"); trimmed != path && trimmed != "" {
504                         found[trimmed] = size
505                         delete(found, path)
506                         path = trimmed
507                 }
508                 if expected[path] == -1 {
509                         found[path] = -1
510                 }
511         }
512         c.Check(found, check.DeepEquals, expected)
513 }