19362: Move non-S3 collection-addressed reqs to sitefs code path.
[arvados.git] / sdk / go / arvados / fs_collection.go
index 26012e240603d0be43a1019346c4e946e2821790..256f513cf9f3e9a126bb918fe18a021ca7ca2e8b 100644 (file)
@@ -290,44 +290,70 @@ func (fs *collectionFileSystem) Truncate(int64) error {
        return ErrInvalidOperation
 }
 
-// Check for and incorporate upstream changes -- unless that has
-// already been done recently, in which case this func is a no-op.
-func (fs *collectionFileSystem) checkChangesOnServer() error {
+// Check for and incorporate upstream changes. If force==false, this
+// is a no-op except once every ttl/100 or so.
+//
+// Return value is true if new content was loaded from upstream and
+// any unsaved local changes have been discarded.
+func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
        if fs.uuid == "" && fs.savedPDH.Load() == "" {
-               return nil
+               return false, nil
        }
 
-       // First try UUID if any, then last known PDH. Stop if all
-       // signatures are new enough.
-       checkingAll := false
-       for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
-               if id == "" {
-                       continue
-               }
-
-               fs.lockCheckChanges.Lock()
-               if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
-                       fs.lockCheckChanges.Unlock()
-                       return nil
-               }
-               remain, ttl := fs.signatureTimeLeft()
-               if remain > 0.01 && !checkingAll {
-                       fs.holdCheckChanges = time.Now().Add(ttl / 100)
-               }
+       fs.lockCheckChanges.Lock()
+       if !force && fs.holdCheckChanges.After(time.Now()) {
                fs.lockCheckChanges.Unlock()
+               return false, nil
+       }
+       remain, ttl := fs.signatureTimeLeft()
+       if remain > 0.01 {
+               fs.holdCheckChanges = time.Now().Add(ttl / 100)
+       }
+       fs.lockCheckChanges.Unlock()
 
-               if remain >= 0.5 {
-                       break
+       if !force && remain >= 0.5 {
+               // plenty of time left on current signatures
+               return false, nil
+       }
+
+       getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
+       if fs.uuid != "" {
+               var coll Collection
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
+               if err != nil {
+                       return false, err
+               }
+               if coll.PortableDataHash != fs.savedPDH.Load().(string) {
+                       // collection has changed upstream since we
+                       // last loaded or saved. Refresh local data,
+                       // losing any unsaved local changes.
+                       newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
+                       if err != nil {
+                               return false, err
+                       }
+                       snap, err := Snapshot(newfs, "/")
+                       if err != nil {
+                               return false, err
+                       }
+                       err = Splice(fs, "/", snap)
+                       if err != nil {
+                               return false, err
+                       }
+                       fs.savedPDH.Store(coll.PortableDataHash)
+                       return true, nil
                }
-               checkingAll = true
+               fs.updateSignatures(coll.ManifestText)
+               return false, nil
+       }
+       if pdh := fs.savedPDH.Load().(string); pdh != "" {
                var coll Collection
-               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+               err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, getparams)
                if err != nil {
-                       continue
+                       return false, err
                }
                fs.updateSignatures(coll.ManifestText)
        }
-       return nil
+       return false, nil
 }
 
 // Refresh signature on a single locator, if necessary. Assume caller
@@ -339,7 +365,7 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
        if err != nil || exp.Sub(time.Now()) > time.Minute {
                // Synchronous update is not needed. Start an
                // asynchronous update if needed.
-               go fs.checkChangesOnServer()
+               go fs.checkChangesOnServer(false)
                return locator
        }
        var manifests string
@@ -368,11 +394,11 @@ func (fs *collectionFileSystem) refreshSignature(locator string) string {
 }
 
 func (fs *collectionFileSystem) Sync() error {
-       err := fs.checkChangesOnServer()
+       refreshed, err := fs.checkChangesOnServer(true)
        if err != nil {
                return err
        }
-       if fs.uuid == "" {
+       if refreshed || fs.uuid == "" {
                return nil
        }
        txt, err := fs.MarshalManifest(".")
@@ -403,7 +429,7 @@ func (fs *collectionFileSystem) Sync() error {
                "select": selectFields,
        })
        if err != nil {
-               return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
+               return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err)
        }
        fs.updateSignatures(coll.ManifestText)
        fs.savedPDH.Store(coll.PortableDataHash)
@@ -1163,10 +1189,11 @@ func (dn *dirnode) MemorySize() (size int64) {
                        size += 64
                        for _, seg := range node.segments {
                                switch seg := seg.(type) {
+                               case storedSegment:
+                                       size += int64(len(seg.locator)) + 40
                                case *memSegment:
-                                       size += int64(seg.Len())
+                                       size += int64(seg.Len()) + 8
                                }
-                               size += 64
                        }
                }
        }