18600: Implement collection-update API.
authorTom Clegg <tom@curii.com>
Mon, 7 Mar 2022 18:53:01 +0000 (13:53 -0500)
committerTom Clegg <tom@curii.com>
Mon, 7 Mar 2022 18:53:01 +0000 (13:53 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/controller/localdb/collection.go
lib/controller/localdb/collection_test.go
sdk/go/arvados/fs_backend.go
sdk/go/httpserver/error.go

index 96c89252ec0285e58dac4330333070c9898cce9e..965b009f45c0996edc954ae708261e45984c2b5a 100644 (file)
@@ -6,10 +6,17 @@ package localdb
 
 import (
        "context"
+       "fmt"
+       "net/http"
+       "os"
+       "sort"
+       "strings"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
 )
 
 // CollectionGet defers to railsProxy for everything except blob
@@ -61,6 +68,9 @@ func (conn *Conn) CollectionCreate(ctx context.Context, opts arvados.CreateOptio
                // them.
                opts.Select = append([]string{"is_trashed", "trash_at"}, opts.Select...)
        }
+       if err := conn.applySplices(ctx, "", opts.Attrs); err != nil {
+               return arvados.Collection{}, err
+       }
        resp, err := conn.railsProxy.CollectionCreate(ctx, opts)
        if err != nil {
                return resp, err
@@ -82,6 +92,9 @@ func (conn *Conn) CollectionUpdate(ctx context.Context, opts arvados.UpdateOptio
                // them.
                opts.Select = append([]string{"is_trashed", "trash_at"}, opts.Select...)
        }
+       if err := conn.applySplices(ctx, opts.UUID, opts.Attrs); err != nil {
+               return arvados.Collection{}, err
+       }
        resp, err := conn.railsProxy.CollectionUpdate(ctx, opts)
        if err != nil {
                return resp, err
@@ -108,3 +121,167 @@ func (conn *Conn) signCollection(ctx context.Context, coll *arvados.Collection)
        }
        coll.ManifestText = arvados.SignManifest(coll.ManifestText, token, exp, ttl, []byte(conn.cluster.Collections.BlobSigningKey))
 }
+
+// If attrs["splices"] is present, populate attrs["manifest_text"] by
+// starting with the content of fromUUID (or an empty collection if
+// fromUUID is empty) and applying the specified splice operations.
+func (conn *Conn) applySplices(ctx context.Context, fromUUID string, attrs map[string]interface{}) error {
+       var splices map[string]string
+
+       // Validate the incoming attrs, and return early if the
+       // request doesn't ask for any splices.
+       if sp, ok := attrs["splices"]; !ok {
+               return nil
+       } else {
+               switch sp := sp.(type) {
+               default:
+                       return httpserver.Errorf(http.StatusBadRequest, "invalid type %T for splices parameter", sp)
+               case nil:
+                       return nil
+               case map[string]string:
+                       splices = sp
+               case map[string]interface{}:
+                       splices = make(map[string]string, len(sp))
+                       for dst, src := range sp {
+                               if src, ok := src.(string); ok {
+                                       splices[dst] = src
+                               } else {
+                                       return httpserver.Errorf(http.StatusBadRequest, "invalid source type for splice target %q: %v", dst, src)
+                               }
+                       }
+               }
+               if len(splices) == 0 {
+                       return nil
+               } else if mtxt, ok := attrs["manifest_text"].(string); ok && len(mtxt) > 0 {
+                       return httpserver.Errorf(http.StatusBadRequest, "ambiguous request: both 'splices' and 'manifest_text' values provided")
+               }
+       }
+
+       // Load the current collection (if any) and set up an
+       // in-memory filesystem.
+       var dst arvados.Collection
+       if _, rootsplice := splices["/"]; !rootsplice && fromUUID != "" {
+               src, err := conn.CollectionGet(ctx, arvados.GetOptions{UUID: fromUUID})
+               if err != nil {
+                       return err
+               }
+               dst = src
+       }
+       dstfs, err := dst.FileSystem(&arvados.StubClient{}, &arvados.StubClient{})
+       if err != nil {
+               return err
+       }
+
+       // Sort splices by source collection to avoid redundant
+       // reloads when a source collection is used more than
+       // once. Note empty sources (which mean "delete target path")
+       // sort first.
+       dstTodo := make([]string, 0, len(splices))
+       {
+               srcid := make(map[string]string, len(splices))
+               for dst, src := range splices {
+                       dstTodo = append(dstTodo, dst)
+                       if i := strings.IndexRune(src, '/'); i > 0 {
+                               srcid[dst] = src[:i]
+                       }
+               }
+               sort.Slice(dstTodo, func(i, j int) bool {
+                       return srcid[dstTodo[i]] < srcid[dstTodo[j]]
+               })
+       }
+
+       // Reject attempt to splice a node as well as its descendant
+       // (e.g., a/ and a/b/), which is unsupported, except where the
+       // source for a/ is empty (i.e., delete).
+       for _, dst := range dstTodo {
+               if dst != "/" && (strings.HasSuffix(dst, "/") ||
+                       strings.HasSuffix(dst, "/.") ||
+                       strings.HasSuffix(dst, "/..") ||
+                       strings.Contains(dst, "//") ||
+                       strings.Contains(dst, "/./") ||
+                       strings.Contains(dst, "/../") ||
+                       !strings.HasPrefix(dst, "/")) {
+                       return httpserver.Errorf(http.StatusBadRequest, "invalid splice target: %q", dst)
+               }
+               for i := 0; i < len(dst)-1; i++ {
+                       if dst[i] != '/' {
+                               continue
+                       }
+                       outerdst := dst[:i]
+                       if outerdst == "" {
+                               outerdst = "/"
+                       }
+                       if outersrc := splices[outerdst]; outersrc != "" {
+                               return httpserver.Errorf(http.StatusBadRequest, "cannot splice at target %q with non-empty splice at %q", dst, outerdst)
+                       }
+               }
+       }
+
+       var srcidloaded string
+       var srcfs arvados.FileSystem
+       // Apply the requested splices.
+       for _, dst := range dstTodo {
+               src := splices[dst]
+               if src == "" {
+                       if dst == "/" {
+                               // In this case we started with a
+                               // blank manifest, so there can't be
+                               // anything to delete.
+                               continue
+                       }
+                       err := dstfs.RemoveAll(dst)
+                       if err != nil {
+                               return fmt.Errorf("RemoveAll(%s): %w", dst, err)
+                       }
+                       continue
+               }
+               srcspec := strings.SplitN(src, "/", 2)
+               srcid, srcpath := srcspec[0], "/"
+               if !arvadosclient.PDHMatch(srcid) {
+                       return httpserver.Errorf(http.StatusBadRequest, "invalid source %q for splices[%q]: must be \"\" or \"PDH[/path]\"", src, dst)
+               }
+               if len(srcspec) == 2 && srcspec[1] != "" {
+                       srcpath = srcspec[1]
+               }
+               if srcidloaded != srcid {
+                       srcfs = nil
+                       srccoll, err := conn.CollectionGet(ctx, arvados.GetOptions{UUID: srcid})
+                       if err != nil {
+                               return err
+                       }
+                       // We use StubClient here because we don't
+                       // want srcfs to read/write any file data or
+                       // sync collection state to/from the database.
+                       srcfs, err = srccoll.FileSystem(&arvados.StubClient{}, &arvados.StubClient{})
+                       if err != nil {
+                               return err
+                       }
+                       srcidloaded = srcid
+               }
+               snap, err := arvados.Snapshot(srcfs, srcpath)
+               if err != nil {
+                       return httpserver.Errorf(http.StatusBadRequest, "error getting snapshot of %q from %q: %w", srcpath, srcid, err)
+               }
+               // Create intermediate dirs, in case dst is
+               // "newdir1/newdir2/dst".
+               for i := 1; i < len(dst)-1; i++ {
+                       if dst[i] == '/' {
+                               err = dstfs.Mkdir(dst[:i], 0777)
+                               if err != nil && !os.IsExist(err) {
+                                       return httpserver.Errorf(http.StatusBadRequest, "error creating parent dirs for %q: %w", dst, err)
+                               }
+                       }
+               }
+               err = arvados.Splice(dstfs, dst, snap)
+               if err != nil {
+                       return fmt.Errorf("error splicing snapshot onto path %q: %w", dst, err)
+               }
+       }
+       mtxt, err := dstfs.MarshalManifest(".")
+       if err != nil {
+               return err
+       }
+       delete(attrs, "splices")
+       attrs["manifest_text"] = mtxt
+       return nil
+}
index 36108b435182744db99b8564b349205c56f7944a..27cdf3899e42fb230168c6b8444502cced5e87f9 100644 (file)
@@ -7,9 +7,11 @@ package localdb
 import (
        "context"
        "io/fs"
+       "path/filepath"
        "regexp"
        "sort"
        "strconv"
+       "strings"
        "time"
 
        "git.arvados.org/arvados.git/lib/config"
@@ -175,30 +177,102 @@ func (s *CollectionSuite) TestCollectionUpdateFiles(c *check.C) {
        c.Assert(err, check.IsNil)
        s.expectFiles(c, dst, "b/baz.txt", "q/", "w/qux.txt")
 
-       // Move content within collection
+       // Move and copy content within collection
        dst, err = s.localdb.CollectionUpdate(ctx, arvados.UpdateOptions{
                UUID: dst.UUID,
                Attrs: map[string]interface{}{
                        "splices": map[string]string{
+                               // Note splicing content to
+                               // /b/corge.txt but removing
+                               // everything else from /b
                                "/b":              "",
+                               "/b/corge.txt":    dst.PortableDataHash + "/b/baz.txt",
                                "/quux/corge.txt": dst.PortableDataHash + "/b/baz.txt",
                        },
                }})
        c.Assert(err, check.IsNil)
-       s.expectFiles(c, dst, "q/", "w/qux.txt", "quux/corge.txt")
-}
+       s.expectFiles(c, dst, "b/corge.txt", "q/", "w/qux.txt", "quux/corge.txt")
 
-// Wrap arvados.FileSystem to satisfy the fs.FS interface (until the
-// SDK offers a neater solution) so we can use fs.WalkDir().
-type filesystemfs struct {
-       arvados.FileSystem
-}
+       // Remove everything except one file
+       dst, err = s.localdb.CollectionUpdate(ctx, arvados.UpdateOptions{
+               UUID: dst.UUID,
+               Attrs: map[string]interface{}{
+                       "splices": map[string]string{
+                               "/":            "",
+                               "/b/corge.txt": dst.PortableDataHash + "/b/corge.txt",
+                       },
+               }})
+       c.Assert(err, check.IsNil)
+       s.expectFiles(c, dst, "b/corge.txt")
 
-func (fs filesystemfs) Open(path string) (fs.File, error) {
-       f, err := fs.FileSystem.Open(path)
-       return f, err
+       // Copy entire collection to root
+       dstcopy, err := s.localdb.CollectionCreate(ctx, arvados.CreateOptions{
+               Attrs: map[string]interface{}{
+                       // Note map[string]interface{} here, which is
+                       // how lib/controller/router requests will
+                       // look.
+                       "splices": map[string]interface{}{
+                               "/": dst.PortableDataHash,
+                       },
+               }})
+       c.Check(err, check.IsNil)
+       c.Check(dstcopy.PortableDataHash, check.Equals, dst.PortableDataHash)
+       s.expectFiles(c, dstcopy, "b/corge.txt")
+
+       for _, splices := range []map[string]string{
+               {
+                       "/foo/nope": dst.PortableDataHash + "/b",
+                       "/foo":      dst.PortableDataHash + "/b",
+               },
+               {
+                       "/foo":      dst.PortableDataHash + "/b",
+                       "/foo/nope": "",
+               },
+               {
+                       "/":     dst.PortableDataHash + "/",
+                       "/nope": "",
+               },
+               {
+                       "/":     dst.PortableDataHash + "/",
+                       "/nope": dst.PortableDataHash + "/b",
+               },
+               {"/bad/": ""},
+               {"/./bad": ""},
+               {"/b/./ad": ""},
+               {"/b/../ad": ""},
+               {"/b/.": ""},
+               {".": ""},
+               {"bad": ""},
+               {"": ""},
+               {"/bad": "/b"},
+               {"/bad": "bad/b"},
+               {"/bad": dst.UUID + "/b"},
+       } {
+               _, err = s.localdb.CollectionUpdate(ctx, arvados.UpdateOptions{
+                       UUID: dst.UUID,
+                       Attrs: map[string]interface{}{
+                               "splices": splices,
+                       }})
+               c.Logf("splices %#v\n... got err: %s", splices, err)
+               c.Check(err, check.NotNil)
+       }
+       for _, splices := range []interface{}{
+               map[string]int{"foo": 1},
+               map[int]string{1: "foo"},
+       } {
+               _, err = s.localdb.CollectionUpdate(ctx, arvados.UpdateOptions{
+                       UUID: dst.UUID,
+                       Attrs: map[string]interface{}{
+                               "splices": splices,
+                       }})
+               c.Logf("splices %#v\n... got err: %s", splices, err)
+               c.Check(err, check.NotNil)
+       }
 }
 
+// expectFiles checks coll's directory structure against the given
+// list of expected files and empty directories. An expected path with
+// a trailing slash indicates an empty directory.
 func (s *CollectionSuite) expectFiles(c *check.C, coll arvados.Collection, expected ...string) {
        client := arvados.NewClientFromEnv()
        ac, err := arvadosclient.New(client)
@@ -208,10 +282,32 @@ func (s *CollectionSuite) expectFiles(c *check.C, coll arvados.Collection, expec
        cfs, err := coll.FileSystem(arvados.NewClientFromEnv(), kc)
        c.Assert(err, check.IsNil)
        var found []string
-       fs.WalkDir(filesystemfs{cfs}, "/", func(path string, d fs.DirEntry, err error) error {
-               found = append(found, path)
+       nonemptydirs := map[string]bool{}
+       fs.WalkDir(arvados.FS(cfs), "/", func(path string, d fs.DirEntry, err error) error {
+               dir, _ := filepath.Split(path)
+               nonemptydirs[dir] = true
+               if d.IsDir() {
+                       if path != "/" {
+                               path += "/"
+                       }
+                       if !nonemptydirs[path] {
+                               nonemptydirs[path] = false
+                       }
+               } else {
+                       found = append(found, path)
+               }
                return nil
        })
+       for d, nonempty := range nonemptydirs {
+               if !nonempty {
+                       found = append(found, d)
+               }
+       }
+       for i, path := range found {
+               if path != "/" {
+                       found[i] = strings.TrimPrefix(path, "/")
+               }
+       }
        sort.Strings(found)
        sort.Strings(expected)
        c.Check(found, check.DeepEquals, expected)
index 445ac8103008f0104f1f53d731843bc893ea3d33..cc4c32ffe9bc520e48a46da858b6037b541f8bb4 100644 (file)
@@ -37,7 +37,7 @@ var errStubClient = errors.New("stub client")
 type StubClient struct{}
 
 func (*StubClient) ReadAt(string, []byte, int) (int, error) { return 0, errStubClient }
-func (*StubClient) LocalLocator(string) (string, error)     { return "", errStubClient }
+func (*StubClient) LocalLocator(loc string) (string, error) { return loc, nil }
 func (*StubClient) BlockWrite(context.Context, BlockWriteOptions) (BlockWriteResponse, error) {
        return BlockWriteResponse{}, errStubClient
 }
index f1817d3374ae11e07f4479de77b1b1b67e4b3cf9..75ff85336fada402d3fab4dedece6f634f8aaed9 100644 (file)
@@ -6,9 +6,14 @@ package httpserver
 
 import (
        "encoding/json"
+       "fmt"
        "net/http"
 )
 
+func Errorf(status int, tmpl string, args ...interface{}) error {
+       return errorWithStatus{fmt.Errorf(tmpl, args...), status}
+}
+
 func ErrorWithStatus(err error, status int) error {
        return errorWithStatus{err, status}
 }