type collectionFileSystem struct {
fileSystem
uuid string
+ savedPDH atomic.Value
replicas int
storageClasses []string
+ // guessSignatureTTL tracks a lower bound for the server's
+ // configured BlobSigningTTL. The guess is initially zero, and
+ // increases when we come across a signature with an expiry
+ // time further in the future than the previous guess.
+ //
+ // When the guessed TTL is much smaller than the real TTL,
+ // preemptive signature refresh is delayed or missed entirely,
+ // which is OK.
+ guessSignatureTTL time.Duration
+ holdCheckChanges time.Time
+ lockCheckChanges sync.Mutex
}
// FileSystem returns a CollectionFileSystem for the collection.
thr: newThrottle(concurrentWriters),
},
}
+ fs.savedPDH.Store(c.PortableDataHash)
if r := c.ReplicationDesired; r != nil {
fs.replicas = *r
}
return fs, nil
}
-func backdateTree(n inode, modTime time.Time) {
+// caller must have lock (or guarantee no concurrent accesses somehow)
+func eachNode(n inode, ffunc func(*filenode), dfunc func(*dirnode)) {
switch n := n.(type) {
case *filenode:
- n.fileinfo.modTime = modTime
+ if ffunc != nil {
+ ffunc(n)
+ }
case *dirnode:
- n.fileinfo.modTime = modTime
+ if dfunc != nil {
+ dfunc(n)
+ }
for _, n := range n.inodes {
- backdateTree(n, modTime)
+ eachNode(n, ffunc, dfunc)
}
}
}
+// caller must have lock (or guarantee no concurrent accesses somehow)
+func backdateTree(n inode, modTime time.Time) {
+ eachNode(n, func(fn *filenode) {
+ fn.fileinfo.modTime = modTime
+ }, func(dn *dirnode) {
+ dn.fileinfo.modTime = modTime
+ })
+}
+
+// Approximate portion of signature TTL remaining, usually between 0
+// and 1, or negative if some signatures have expired.
+func (fs *collectionFileSystem) signatureTimeLeft() (float64, time.Duration) {
+ var (
+ now = time.Now()
+ earliest = now.Add(time.Hour * 24 * 7 * 365)
+ latest time.Time
+ )
+ fs.fileSystem.root.RLock()
+ eachNode(fs.root, func(fn *filenode) {
+ fn.Lock()
+ defer fn.Unlock()
+ for _, seg := range fn.segments {
+ seg, ok := seg.(storedSegment)
+ if !ok {
+ continue
+ }
+ expiryTime, err := signatureExpiryTime(seg.locator)
+ if err != nil {
+ continue
+ }
+ if expiryTime.Before(earliest) {
+ earliest = expiryTime
+ }
+ if expiryTime.After(latest) {
+ latest = expiryTime
+ }
+ }
+ }, nil)
+ fs.fileSystem.root.RUnlock()
+
+ if latest.IsZero() {
+ // No signatures == 100% of TTL remaining.
+ return 1, 1
+ }
+
+ ttl := latest.Sub(now)
+ fs.fileSystem.root.Lock()
+ {
+ if ttl > fs.guessSignatureTTL {
+ // ttl is closer to the real TTL than
+ // guessSignatureTTL.
+ fs.guessSignatureTTL = ttl
+ } else {
+ // Use the previous best guess to compute the
+ // portion remaining (below, after unlocking
+ // mutex).
+ ttl = fs.guessSignatureTTL
+ }
+ }
+ fs.fileSystem.root.Unlock()
+
+ return earliest.Sub(now).Seconds() / ttl.Seconds(), ttl
+}
+
+func (fs *collectionFileSystem) updateSignatures(newmanifest string) {
+ newLoc := map[string]string{}
+ for _, tok := range regexp.MustCompile(`\S+`).FindAllString(newmanifest, -1) {
+ if mBlkRe.MatchString(tok) {
+ newLoc[stripAllHints(tok)] = tok
+ }
+ }
+ fs.fileSystem.root.Lock()
+ defer fs.fileSystem.root.Unlock()
+ eachNode(fs.root, func(fn *filenode) {
+ fn.Lock()
+ defer fn.Unlock()
+ for idx, seg := range fn.segments {
+ seg, ok := seg.(storedSegment)
+ if !ok {
+ continue
+ }
+ loc, ok := newLoc[stripAllHints(seg.locator)]
+ if !ok {
+ continue
+ }
+ seg.locator = loc
+ fn.segments[idx] = seg
+ }
+ }, nil)
+}
+
func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
if name == "" || name == "." || name == ".." {
return nil, ErrInvalidArgument
return ErrInvalidOperation
}
+// Check for and incorporate upstream changes -- unless that has
+// already been done recently, in which case this func is a no-op.
+func (fs *collectionFileSystem) checkChangesOnServer() error {
+ if fs.uuid == "" && fs.savedPDH.Load() == "" {
+ return nil
+ }
+
+ // First try UUID if any, then last known PDH. Stop if all
+ // signatures are new enough.
+ checkingAll := false
+ for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
+ if id == "" {
+ continue
+ }
+
+ fs.lockCheckChanges.Lock()
+ if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
+ fs.lockCheckChanges.Unlock()
+ return nil
+ }
+ remain, ttl := fs.signatureTimeLeft()
+ if remain > 0.01 && !checkingAll {
+ fs.holdCheckChanges = time.Now().Add(ttl / 100)
+ }
+ fs.lockCheckChanges.Unlock()
+
+ if remain >= 0.5 {
+ break
+ }
+ checkingAll = true
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+ if err != nil {
+ continue
+ }
+ fs.updateSignatures(coll.ManifestText)
+ }
+ return nil
+}
+
+// Refresh signature on a single locator, if necessary. Assume caller
+// has lock. If an update is needed, and there are any storedSegments
+// whose signatures can be updated, start a background task to update
+// them asynchronously when the caller releases locks.
+func (fs *collectionFileSystem) refreshSignature(locator string) string {
+ exp, err := signatureExpiryTime(locator)
+ if err != nil || exp.Sub(time.Now()) > time.Minute {
+ // Synchronous update is not needed. Start an
+ // asynchronous update if needed.
+ go fs.checkChangesOnServer()
+ return locator
+ }
+ var manifests string
+ for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
+ if id == "" {
+ continue
+ }
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+ if err != nil {
+ continue
+ }
+ manifests += coll.ManifestText
+ }
+ hash := stripAllHints(locator)
+ for _, tok := range regexp.MustCompile(`\S+`).FindAllString(manifests, -1) {
+ if mBlkRe.MatchString(tok) {
+ if stripAllHints(tok) == hash {
+ locator = tok
+ break
+ }
+ }
+ }
+ go fs.updateSignatures(manifests)
+ return locator
+}
+
func (fs *collectionFileSystem) Sync() error {
+ err := fs.checkChangesOnServer()
+ if err != nil {
+ return err
+ }
if fs.uuid == "" {
return nil
}
if err != nil {
return fmt.Errorf("sync failed: %s", err)
}
- coll := &Collection{
+ if PortableDataHash(txt) == fs.savedPDH.Load() {
+ // No local changes since last save or initial load.
+ return nil
+ }
+ coll := Collection{
UUID: fs.uuid,
ManifestText: txt,
}
- err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
+
+ selectFields := []string{"uuid", "portable_data_hash"}
+ fs.lockCheckChanges.Lock()
+ remain, _ := fs.signatureTimeLeft()
+ fs.lockCheckChanges.Unlock()
+ if remain < 0.5 {
+ selectFields = append(selectFields, "manifest_text")
+ }
+
+ err = fs.RequestAndDecode(&coll, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
"collection": map[string]string{
"manifest_text": coll.ManifestText,
},
- "select": []string{"uuid"},
+ "select": selectFields,
})
if err != nil {
return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
}
+ fs.updateSignatures(coll.ManifestText)
+ fs.savedPDH.Store(coll.PortableDataHash)
return nil
}
err = io.EOF
return
}
+ if ss, ok := fn.segments[ptr.segmentIdx].(storedSegment); ok {
+ ss.locator = fn.fs.refreshSignature(ss.locator)
+ fn.segments[ptr.segmentIdx] = ss
+ }
n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
if n > 0 {
ptr.off += int64(n)