+// Check for and incorporate upstream changes. If force==false, this
+// is a no-op except once every ttl/100 or so.
+//
+// Return value is true if new content was loaded from upstream and
+// any unsaved local changes have been discarded.
+func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
+ if fs.uuid == "" && fs.loadedPDH.Load() == "" {
+ return false, nil
+ }
+
+ fs.lockCheckChanges.Lock()
+ if !force && fs.holdCheckChanges.After(time.Now()) {
+ fs.lockCheckChanges.Unlock()
+ return false, nil
+ }
+ remain, ttl := fs.signatureTimeLeft()
+ if remain > 0.01 {
+ fs.holdCheckChanges = time.Now().Add(ttl / 100)
+ }
+ fs.lockCheckChanges.Unlock()
+
+ if !force && remain >= 0.5 {
+ // plenty of time left on current signatures
+ return false, nil
+ }
+
+ loadedPDH, _ := fs.loadedPDH.Load().(string)
+ getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
+ if fs.uuid != "" {
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
+ if err != nil {
+ return false, err
+ }
+ if coll.PortableDataHash != loadedPDH {
+ // collection has changed upstream since we
+ // last loaded or saved. Refresh local data,
+ // losing any unsaved local changes.
+ newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
+ if err != nil {
+ return false, err
+ }
+ snap, err := Snapshot(newfs, "/")
+ if err != nil {
+ return false, err
+ }
+ err = Splice(fs, "/", snap)
+ if err != nil {
+ return false, err
+ }
+ fs.loadedPDH.Store(coll.PortableDataHash)
+ fs.savedPDH.Store(newfs.(*collectionFileSystem).savedPDH.Load())
+ return true, nil
+ }
+ fs.updateSignatures(coll.ManifestText)
+ return false, nil
+ }
+ if loadedPDH != "" {
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+loadedPDH, nil, getparams)
+ if err != nil {
+ return false, err
+ }
+ fs.updateSignatures(coll.ManifestText)
+ }
+ return false, nil
+}
+
+// Refresh signature on a single locator, if necessary. Assume caller
+// has lock. If an update is needed, and there are any storedSegments
+// whose signatures can be updated, start a background task to update
+// them asynchronously when the caller releases locks.
+func (fs *collectionFileSystem) refreshSignature(locator string) string {
+ exp, err := signatureExpiryTime(locator)
+ if err != nil || exp.Sub(time.Now()) > time.Minute {
+ // Synchronous update is not needed. Start an
+ // asynchronous update if needed.
+ go fs.checkChangesOnServer(false)
+ return locator
+ }
+ loadedPDH, _ := fs.loadedPDH.Load().(string)
+ var manifests string
+ for _, id := range []string{fs.uuid, loadedPDH} {
+ if id == "" {
+ continue
+ }
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+ if err != nil {
+ continue
+ }
+ manifests += coll.ManifestText
+ }
+ hash := stripAllHints(locator)
+ for _, tok := range regexp.MustCompile(`\S+`).FindAllString(manifests, -1) {
+ if mBlkRe.MatchString(tok) {
+ if stripAllHints(tok) == hash {
+ locator = tok
+ break
+ }
+ }
+ }
+ go fs.updateSignatures(manifests)
+ return locator
+}
+