type collectionFileSystem struct {
fileSystem
uuid string
- savedPDH atomic.Value
replicas int
storageClasses []string
+
+ // PDH returned by the server as of last sync/load.
+ loadedPDH atomic.Value
+ // PDH of the locally generated manifest as of last
+ // sync/load. This can differ from loadedPDH after loading a
+ // version that was generated with different code and sorts
+ // filenames differently than we do, for example.
+ savedPDH atomic.Value
+
// guessSignatureTTL tracks a lower bound for the server's
// configured BlobSigningTTL. The guess is initially zero, and
// increases when we come across a signature with an expiry
thr: newThrottle(concurrentWriters),
},
}
- fs.savedPDH.Store(c.PortableDataHash)
+ fs.loadedPDH.Store(c.PortableDataHash)
if r := c.ReplicationDesired; r != nil {
fs.replicas = *r
}
name: ".",
mode: os.ModeDir | 0755,
modTime: modTime,
+ sys: func() interface{} { return c },
},
inodes: make(map[string]inode),
},
if err := root.loadManifest(c.ManifestText); err != nil {
return nil, err
}
+
+ txt, err := root.marshalManifest(context.Background(), ".", false)
+ if err != nil {
+ return nil, err
+ }
+ fs.savedPDH.Store(PortableDataHash(txt))
+
backdateTree(root, modTime)
fs.root = root
return fs, nil
return ErrInvalidOperation
}
-// Check for and incorporate upstream changes -- unless that has
-// already been done recently, in which case this func is a no-op.
-func (fs *collectionFileSystem) checkChangesOnServer() error {
- if fs.uuid == "" && fs.savedPDH.Load() == "" {
- return nil
+// Check for and incorporate upstream changes. If force==false, this
+// is a no-op except once every ttl/100 or so.
+//
+// Return value is true if new content was loaded from upstream and
+// any unsaved local changes have been discarded.
+func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
+ if fs.uuid == "" && fs.loadedPDH.Load() == "" {
+ return false, nil
}
- // First try UUID if any, then last known PDH. Stop if all
- // signatures are new enough.
- checkingAll := false
- for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
- if id == "" {
- continue
- }
-
- fs.lockCheckChanges.Lock()
- if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
- fs.lockCheckChanges.Unlock()
- return nil
- }
- remain, ttl := fs.signatureTimeLeft()
- if remain > 0.01 && !checkingAll {
- fs.holdCheckChanges = time.Now().Add(ttl / 100)
- }
+ fs.lockCheckChanges.Lock()
+ if !force && fs.holdCheckChanges.After(time.Now()) {
fs.lockCheckChanges.Unlock()
+ return false, nil
+ }
+ remain, ttl := fs.signatureTimeLeft()
+ if remain > 0.01 {
+ fs.holdCheckChanges = time.Now().Add(ttl / 100)
+ }
+ fs.lockCheckChanges.Unlock()
- if remain >= 0.5 {
- break
+ if !force && remain >= 0.5 {
+ // plenty of time left on current signatures
+ return false, nil
+ }
+
+ loadedPDH, _ := fs.loadedPDH.Load().(string)
+ getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
+ if fs.uuid != "" {
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
+ if err != nil {
+ return false, err
+ }
+ if coll.PortableDataHash != loadedPDH {
+ // collection has changed upstream since we
+ // last loaded or saved. Refresh local data,
+ // losing any unsaved local changes.
+ newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
+ if err != nil {
+ return false, err
+ }
+ snap, err := Snapshot(newfs, "/")
+ if err != nil {
+ return false, err
+ }
+ err = Splice(fs, "/", snap)
+ if err != nil {
+ return false, err
+ }
+ fs.loadedPDH.Store(coll.PortableDataHash)
+ fs.savedPDH.Store(newfs.(*collectionFileSystem).savedPDH.Load())
+ return true, nil
}
- checkingAll = true
+ fs.updateSignatures(coll.ManifestText)
+ return false, nil
+ }
+ if loadedPDH != "" {
var coll Collection
- err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+loadedPDH, nil, getparams)
if err != nil {
- continue
+ return false, err
}
fs.updateSignatures(coll.ManifestText)
}
- return nil
+ return false, nil
}
// Refresh signature on a single locator, if necessary. Assume caller
if err != nil || exp.Sub(time.Now()) > time.Minute {
// Synchronous update is not needed. Start an
// asynchronous update if needed.
- go fs.checkChangesOnServer()
+ go fs.checkChangesOnServer(false)
return locator
}
+ loadedPDH, _ := fs.loadedPDH.Load().(string)
var manifests string
- for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
+ for _, id := range []string{fs.uuid, loadedPDH} {
if id == "" {
continue
}
}
func (fs *collectionFileSystem) Sync() error {
- err := fs.checkChangesOnServer()
+ refreshed, err := fs.checkChangesOnServer(true)
if err != nil {
return err
}
- if fs.uuid == "" {
+ if refreshed || fs.uuid == "" {
return nil
}
txt, err := fs.MarshalManifest(".")
if err != nil {
return fmt.Errorf("sync failed: %s", err)
}
- if PortableDataHash(txt) == fs.savedPDH.Load() {
+ savingPDH := PortableDataHash(txt)
+ if savingPDH == fs.savedPDH.Load() {
// No local changes since last save or initial load.
return nil
}
"select": selectFields,
})
if err != nil {
- return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
+ return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err)
}
fs.updateSignatures(coll.ManifestText)
- fs.savedPDH.Store(coll.PortableDataHash)
+ fs.loadedPDH.Store(coll.PortableDataHash)
+ fs.savedPDH.Store(savingPDH)
return nil
}
func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
- node, err := rlookup(fs.fileSystem.root, path)
+ node, err := rlookup(fs.fileSystem.root, path, nil)
if err != nil {
return err
}
}
func (fs *collectionFileSystem) MemorySize() int64 {
- fs.fileSystem.root.Lock()
- defer fs.fileSystem.root.Unlock()
return fs.fileSystem.root.(*dirnode).MemorySize()
}
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
+ return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, true)
}
func (fs *collectionFileSystem) Size() int64 {
//
// After seeking:
//
-// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
-// ||
-// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
+// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
+// ||
+// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
ptr = startPtr
if ptr.off < 0 {
return fn.fs
}
+func (fn *filenode) MemorySize() (size int64) {
+ fn.RLock()
+ defer fn.RUnlock()
+ size = 64
+ for _, seg := range fn.segments {
+ size += seg.memorySize()
+ }
+ return
+}
+
// Read reads file data from a single segment, starting at startPtr,
// into p. startPtr is assumed not to be up-to-date. Caller must have
// RLock or Lock.
return cg.Wait()
}
-// caller must have write lock.
func (dn *dirnode) MemorySize() (size int64) {
- for _, name := range dn.sortedNames() {
- node := dn.inodes[name]
- node.Lock()
- defer node.Unlock()
- switch node := node.(type) {
- case *dirnode:
- size += node.MemorySize()
- case *filenode:
- size += 64
- for _, seg := range node.segments {
- switch seg := seg.(type) {
- case *memSegment:
- size += int64(seg.Len())
- }
- size += 64
- }
- }
+ dn.RLock()
+ todo := make([]inode, 0, len(dn.inodes))
+ for _, node := range dn.inodes {
+ todo = append(todo, node)
}
- return 64 + size
+ dn.RUnlock()
+ size = 64
+ for _, node := range todo {
+ size += node.MemorySize()
+ }
+ return
}
// caller must have write lock.
}
// caller must have write lock.
-func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, flush bool) (string, error) {
cg := newContextGroup(ctx)
defer cg.Cancel()
for i, name := range dirnames {
i, name := i, name
cg.Go(func() error {
- txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
+ txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, flush)
subdirs[i] = txt
return err
})
var fileparts []filepart
var blocks []string
- if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
+ if !flush {
+ // skip flush -- will fail below if anything
+ // needed flushing
+ } else if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
return err
}
for _, name := range filenames {
}
streamLen += int64(seg.size)
default:
- // This can't happen: we
- // haven't unlocked since
+ // We haven't unlocked since
// calling flush(sync=true).
- panic(fmt.Sprintf("can't marshal segment type %T", seg))
+ // Evidently the caller passed
+ // flush==false but there were
+ // local changes.
+ return fmt.Errorf("can't marshal segment type %T", seg)
}
}
}
}
streams = streams[:len(streams)-1]
segments := []storedSegment{}
+ // streamoffset[n] is the position in the stream of the nth
+ // block, i.e., ∑ segments[j].size ∀ 0≤j<n. We ensure
+ // len(streamoffset) == len(segments) + 1.
+ streamoffset := []int64{0}
// To reduce allocs, we reuse a single "pathparts" slice
// (pre-split on "/" separators) for the duration of this
// func.
}
for i, stream := range streams {
lineno := i + 1
+ fnodeCache := make(map[string]*filenode)
var anyFileTokens bool
- var pos int64
var segIdx int
segments = segments[:0]
+ streamoffset = streamoffset[:1]
pathparts = nil
streamparts := 0
for i, token := range bytes.Split(stream, []byte{' '}) {
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
+ streamoffset = append(streamoffset, streamoffset[len(segments)]+int64(length))
segments = append(segments, storedSegment{
locator: string(token),
size: int(length),
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- if !bytes.ContainsAny(toks[2], `\/`) {
- // optimization for a common case
- pathparts = append(pathparts[:streamparts], string(toks[2]))
- } else {
- pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
+ fnode, cached := fnodeCache[string(toks[2])]
+ if !cached {
+ if !bytes.ContainsAny(toks[2], `\/`) {
+ // optimization for a common case
+ pathparts = append(pathparts[:streamparts], string(toks[2]))
+ } else {
+ pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
+ }
+ fnode, err = dn.createFileAndParents(pathparts)
+ if err != nil {
+ return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
+ }
+ fnodeCache[string(toks[2])] = fnode
}
- fnode, err := dn.createFileAndParents(pathparts)
- if fnode == nil && err == nil && length == 0 {
+ if fnode == nil {
+ // name matches an existing directory
+ if length != 0 {
+ return fmt.Errorf("line %d: cannot use name %q with length %d: is a directory", lineno, toks[2], length)
+ }
// Special case: an empty file used as
// a marker to preserve an otherwise
// empty directory in a manifest.
continue
}
- if err != nil || (fnode == nil && length != 0) {
- return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
- }
// Map the stream offset/range coordinates to
// block/offset/range coordinates and add
// corresponding storedSegments to the filenode
- if pos > offset {
- // Can't continue where we left off.
- // TODO: binary search instead of
- // rewinding all the way (but this
- // situation might be rare anyway)
- segIdx, pos = 0, 0
+ if segIdx < len(segments) && streamoffset[segIdx] <= offset && streamoffset[segIdx+1] > offset {
+ // common case with an easy
+ // optimization: start where the
+ // previous segment ended
+ } else if guess := int(offset >> 26); guess >= 0 && guess < len(segments) && streamoffset[guess] <= offset && streamoffset[guess+1] > offset {
+ // another common case with an easy
+ // optimization: all blocks are 64 MiB
+ // (or close enough)
+ segIdx = guess
+ } else {
+ // general case
+ segIdx = sort.Search(len(segments), func(i int) bool {
+ return streamoffset[i+1] > offset
+ })
}
for ; segIdx < len(segments); segIdx++ {
- seg := segments[segIdx]
- next := pos + int64(seg.Len())
- if next <= offset || seg.Len() == 0 {
- pos = next
- continue
- }
- if pos >= offset+length {
+ blkStart := streamoffset[segIdx]
+ if blkStart >= offset+length {
break
}
+ seg := &segments[segIdx]
+ if seg.size == 0 {
+ continue
+ }
var blkOff int
- if pos < offset {
- blkOff = int(offset - pos)
+ if blkStart < offset {
+ blkOff = int(offset - blkStart)
}
- blkLen := seg.Len() - blkOff
- if pos+int64(blkOff+blkLen) > offset+length {
- blkLen = int(offset + length - pos - int64(blkOff))
+ blkLen := seg.size - blkOff
+ if blkStart+int64(seg.size) > offset+length {
+ blkLen = int(offset + length - blkStart - int64(blkOff))
}
fnode.appendSegment(storedSegment{
kc: dn.fs,
offset: blkOff,
length: blkLen,
})
- if next > offset+length {
- break
- } else {
- pos = next
- }
}
- if segIdx == len(segments) && pos < offset+length {
- return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
+ if segIdx == len(segments) && streamoffset[segIdx] < offset+length {
+ return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, streamoffset[segIdx], token)
}
}
if !anyFileTokens {
// Return a new segment with a subsection of the data from this
// one. length<0 means length=Len()-off.
Slice(off int, length int) segment
+ memorySize() int64
}
type memSegment struct {
return
}
+func (me *memSegment) memorySize() int64 {
+ return 64 + int64(len(me.buf))
+}
+
type storedSegment struct {
kc fsBackend
locator string
return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
}
+func (se storedSegment) memorySize() int64 {
+ return 64 + int64(len(se.locator))
+}
+
func canonicalName(name string) string {
name = path.Clean("/" + name)
if name == "/" || name == "./" {