19362: Sync s3 updates to long-lived session for same token.
authorTom Clegg <tom@curii.com>
Thu, 15 Sep 2022 20:24:15 +0000 (16:24 -0400)
committerTom Clegg <tom@curii.com>
Tue, 20 Sep 2022 20:32:10 +0000 (16:32 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_project.go
sdk/go/arvados/fs_site.go
services/keep-web/s3.go
services/keep-web/s3_test.go

index 5569554ab883b2b9f31748bf983f592b43f96b65..274d20702287ed464d4ea8f3796e528c3f61b30b 100644 (file)
@@ -642,7 +642,7 @@ func (fs *fileSystem) Rename(oldname, newname string) error {
        locked := map[sync.Locker]bool{}
        for i := len(needLock) - 1; i >= 0; i-- {
                n := needLock[i]
        locked := map[sync.Locker]bool{}
        for i := len(needLock) - 1; i >= 0; i-- {
                n := needLock[i]
-               if fs, ok := n.(FileSystem); ok {
+               if fs, ok := n.(interface{ rootnode() inode }); ok {
                        // Lock the fs's root dir directly, not
                        // through the fs. Otherwise our "locked" map
                        // would not reliably prevent double-locking
                        // Lock the fs's root dir directly, not
                        // through the fs. Otherwise our "locked" map
                        // would not reliably prevent double-locking
index 88766129ae557962ecb4eb72282ccd2759096e6d..a68e83945e348a0f3e740b7c0a313155917b795a 100644 (file)
@@ -77,7 +77,7 @@ func (fs *customFileSystem) projectsLoadOne(parent inode, uuid, name string) (in
                        name:   coll.Name,
                }, nil
        } else if strings.Contains(coll.UUID, "-4zz18-") {
                        name:   coll.Name,
                }, nil
        } else if strings.Contains(coll.UUID, "-4zz18-") {
-               return fs.newDeferredCollectionDir(parent, name, coll.UUID, coll.ModifiedAt), nil
+               return fs.newDeferredCollectionDir(parent, name, coll.UUID, coll.ModifiedAt, coll.Properties), nil
        } else {
                log.Printf("group contents: unrecognized UUID in response: %q", coll.UUID)
                return nil, ErrInvalidArgument
        } else {
                log.Printf("group contents: unrecognized UUID in response: %q", coll.UUID)
                return nil, ErrInvalidArgument
@@ -147,7 +147,7 @@ func (fs *customFileSystem) projectsLoadAll(parent inode, uuid string) ([]inode,
                                                Properties: i.Properties,
                                        }))
                                } else if strings.Contains(i.UUID, "-4zz18-") {
                                                Properties: i.Properties,
                                        }))
                                } else if strings.Contains(i.UUID, "-4zz18-") {
-                                       inodes = append(inodes, fs.newDeferredCollectionDir(parent, i.Name, i.UUID, i.ModifiedAt))
+                                       inodes = append(inodes, fs.newDeferredCollectionDir(parent, i.Name, i.UUID, i.ModifiedAt, i.Properties))
                                } else {
                                        log.Printf("group contents: unrecognized UUID in response: %q", i.UUID)
                                        return nil, ErrInvalidArgument
                                } else {
                                        log.Printf("group contents: unrecognized UUID in response: %q", i.UUID)
                                        return nil, ErrInvalidArgument
@@ -163,7 +163,7 @@ func (fs *customFileSystem) newProjectDir(parent inode, name, uuid string, proj
        return &hardlink{inode: fs.projectSingleton(uuid, proj), parent: parent, name: name}
 }
 
        return &hardlink{inode: fs.projectSingleton(uuid, proj), parent: parent, name: name}
 }
 
-func (fs *customFileSystem) newDeferredCollectionDir(parent inode, name, uuid string, modTime time.Time) inode {
+func (fs *customFileSystem) newDeferredCollectionDir(parent inode, name, uuid string, modTime time.Time, props map[string]interface{}) inode {
        if modTime.IsZero() {
                modTime = time.Now()
        }
        if modTime.IsZero() {
                modTime = time.Now()
        }
@@ -175,7 +175,7 @@ func (fs *customFileSystem) newDeferredCollectionDir(parent inode, name, uuid st
                        name:    name,
                        modTime: modTime,
                        mode:    0755 | os.ModeDir,
                        name:    name,
                        modTime: modTime,
                        mode:    0755 | os.ModeDir,
-                       sys:     func() interface{} { return &Collection{UUID: uuid, Name: name, ModifiedAt: modTime} },
+                       sys:     func() interface{} { return &Collection{UUID: uuid, Name: name, ModifiedAt: modTime, Properties: props} },
                },
        }
        return &deferrednode{wrapped: placeholder, create: func() inode {
                },
        }
        return &deferrednode{wrapped: placeholder, create: func() inode {
index 716ef34e26098771c44f99c46aeea593ce3e87fb..6719488e23b0dc403dfe3cf9121c973b4bea76a2 100644 (file)
@@ -266,6 +266,8 @@ func (fs *customFileSystem) collectionSingleton(id string) (inode, error) {
                return n, nil
        }
        fs.byID[id] = cfs
                return n, nil
        }
        fs.byID[id] = cfs
+       fs.byIDRoot.Lock()
+       defer fs.byIDRoot.Unlock()
        fs.byIDRoot.Child(id, func(inode) (inode, error) { return cfs, nil })
        return cfs, nil
 }
        fs.byIDRoot.Child(id, func(inode) (inode, error) { return cfs, nil })
        return cfs, nil
 }
@@ -337,6 +339,18 @@ type hardlink struct {
        name   string
 }
 
        name   string
 }
 
+// If the wrapped inode is a filesystem, rootnode returns the wrapped
+// fs's rootnode, otherwise inode itself. This allows
+// (*fileSystem)Rename() to lock the root node of a hardlink-wrapped
+// filesystem.
+func (hl *hardlink) rootnode() inode {
+       if node, ok := hl.inode.(interface{ rootnode() inode }); ok {
+               return node.rootnode()
+       } else {
+               return hl.inode
+       }
+}
+
 func (hl *hardlink) Sync() error {
        if node, ok := hl.inode.(syncer); ok {
                return node.Sync()
 func (hl *hardlink) Sync() error {
        if node, ok := hl.inode.(syncer); ok {
                return node.Sync()
index 381a1110b131d877c8d6d69f34f63fe0b43b77e8..3c7c2db371f1f3d4adafcb643c24d5978e7db8fc 100644 (file)
@@ -315,7 +315,8 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                return true
        }
                s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                return true
        }
-       if r.Method == http.MethodGet || r.Method == http.MethodHead {
+       readfs := fs
+       if writeMethod[r.Method] {
                // Create a FileSystem for this request, to avoid
                // exposing incomplete write operations to concurrent
                // requests.
                // Create a FileSystem for this request, to avoid
                // exposing incomplete write operations to concurrent
                // requests.
@@ -514,14 +515,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                                return true
                        }
                }
                                return true
                        }
                }
-               err = fs.Sync()
+               err = h.syncCollection(fs, readfs, fspath)
                if err != nil {
                        err = fmt.Errorf("sync failed: %w", err)
                        s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                        return true
                }
                if err != nil {
                        err = fmt.Errorf("sync failed: %w", err)
                        s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                        return true
                }
-               // Ensure a subsequent read operation will see the changes.
-               h.Cache.ResetSession(token)
                w.WriteHeader(http.StatusOK)
                return true
        case r.Method == http.MethodDelete:
                w.WriteHeader(http.StatusOK)
                return true
        case r.Method == http.MethodDelete:
@@ -568,14 +567,12 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
                        s3ErrorResponse(w, InvalidArgument, err.Error(), r.URL.Path, http.StatusBadRequest)
                        return true
                }
                        s3ErrorResponse(w, InvalidArgument, err.Error(), r.URL.Path, http.StatusBadRequest)
                        return true
                }
-               err = fs.Sync()
+               err = h.syncCollection(fs, readfs, fspath)
                if err != nil {
                        err = fmt.Errorf("sync failed: %w", err)
                        s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                        return true
                }
                if err != nil {
                        err = fmt.Errorf("sync failed: %w", err)
                        s3ErrorResponse(w, InternalError, err.Error(), r.URL.Path, http.StatusInternalServerError)
                        return true
                }
-               // Ensure a subsequent read operation will see the changes.
-               h.Cache.ResetSession(token)
                w.WriteHeader(http.StatusNoContent)
                return true
        default:
                w.WriteHeader(http.StatusNoContent)
                return true
        default:
@@ -584,6 +581,29 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
        }
 }
 
        }
 }
 
+func (h *handler) syncCollection(srcfs, dstfs arvados.CustomFileSystem, path string) error {
+       coll, _ := h.determineCollection(srcfs, path)
+       if coll == nil || coll.UUID == "" {
+               return errors.New("could not determine collection to sync")
+       }
+       d, err := srcfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777)
+       if err != nil {
+               return err
+       }
+       defer d.Close()
+       err = d.Sync()
+       snap, err := d.Snapshot()
+       if err != nil {
+               return err
+       }
+       dstd, err := dstfs.OpenFile("by_id/"+coll.UUID, os.O_RDWR, 0777)
+       if err != nil {
+               return err
+       }
+       defer dstd.Close()
+       return dstd.Splice(snap)
+}
+
 func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path string) error {
        maybeEncode := func(s string) string {
                for _, c := range s {
 func setFileInfoHeaders(header http.Header, fs arvados.CustomFileSystem, path string) error {
        maybeEncode := func(s string) string {
                for _, c := range s {
index 476ec9437fe39ca64bbf24b5f87fb5817089ac80..aa91d82ae36ab6c01cb50ef7b7dd944af16b4236 100644 (file)
@@ -367,7 +367,7 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
                if !c.Check(err, check.NotNil) {
                        continue
                }
                if !c.Check(err, check.NotNil) {
                        continue
                }
-               c.Check(err.(*s3.Error).StatusCode, check.Equals, 404)
+               c.Check(err.(*s3.Error).StatusCode, check.Equals, http.StatusNotFound)
                c.Check(err.(*s3.Error).Code, check.Equals, `NoSuchKey`)
                if !c.Check(err, check.ErrorMatches, `The specified key does not exist.`) {
                        continue
                c.Check(err.(*s3.Error).Code, check.Equals, `NoSuchKey`)
                if !c.Check(err, check.ErrorMatches, `The specified key does not exist.`) {
                        continue
@@ -393,10 +393,11 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
 
                // Check that the change is immediately visible via
                // (non-S3) webdav request.
 
                // Check that the change is immediately visible via
                // (non-S3) webdav request.
-               _, resp := s.do("GET", "http://"+collUUID+".keep-web.example/"+trial.path, "", http.Header{
-                       "Authorization": {"Bearer " + arvadostest.ActiveToken},
-               })
+               _, resp := s.do("GET", "http://"+collUUID+".keep-web.example/"+trial.path, arvadostest.ActiveTokenV2, nil)
                c.Check(resp.Code, check.Equals, http.StatusOK)
                c.Check(resp.Code, check.Equals, http.StatusOK)
+               if !strings.HasSuffix(trial.path, "/") {
+                       c.Check(resp.Body.Len(), check.Equals, trial.size)
+               }
        }
 }
 
        }
 }