package arvados
import (
+ "bytes"
"context"
"encoding/json"
"fmt"
type collectionFileSystem struct {
fileSystem
- uuid string
+ uuid string
+ replicas int
+ storageClasses []string
+
+ // PDH returned by the server as of last sync/load.
+ loadedPDH atomic.Value
+ // PDH of the locally generated manifest as of last
+ // sync/load. This can differ from loadedPDH after loading a
+ // version that was generated with different code and sorts
+ // filenames differently than we do, for example.
+ savedPDH atomic.Value
+
+ // guessSignatureTTL tracks a lower bound for the server's
+ // configured BlobSigningTTL. The guess is initially zero, and
+ // increases when we come across a signature with an expiry
+ // time further in the future than the previous guess.
+ //
+ // When the guessed TTL is much smaller than the real TTL,
+ // preemptive signature refresh is delayed or missed entirely,
+ // which is OK.
+ guessSignatureTTL time.Duration
+ holdCheckChanges time.Time
+ lockCheckChanges sync.Mutex
}
// FileSystem returns a CollectionFileSystem for the collection.
modTime = time.Now()
}
fs := &collectionFileSystem{
- uuid: c.UUID,
+ uuid: c.UUID,
+ storageClasses: c.StorageClassesDesired,
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: client, keepClient: kc},
thr: newThrottle(concurrentWriters),
},
}
+ fs.loadedPDH.Store(c.PortableDataHash)
+ if r := c.ReplicationDesired; r != nil {
+ fs.replicas = *r
+ }
root := &dirnode{
fs: fs,
treenode: treenode{
name: ".",
mode: os.ModeDir | 0755,
modTime: modTime,
+ sys: func() interface{} { return c },
},
inodes: make(map[string]inode),
},
if err := root.loadManifest(c.ManifestText); err != nil {
return nil, err
}
+
+ txt, err := root.marshalManifest(context.Background(), ".", false)
+ if err != nil {
+ return nil, err
+ }
+ fs.savedPDH.Store(PortableDataHash(txt))
+
backdateTree(root, modTime)
fs.root = root
return fs, nil
}
-func backdateTree(n inode, modTime time.Time) {
+// caller must have lock (or guarantee no concurrent accesses somehow)
+func eachNode(n inode, ffunc func(*filenode), dfunc func(*dirnode)) {
switch n := n.(type) {
case *filenode:
- n.fileinfo.modTime = modTime
+ if ffunc != nil {
+ ffunc(n)
+ }
case *dirnode:
- n.fileinfo.modTime = modTime
+ if dfunc != nil {
+ dfunc(n)
+ }
for _, n := range n.inodes {
- backdateTree(n, modTime)
+ eachNode(n, ffunc, dfunc)
}
}
}
+// caller must have lock (or guarantee no concurrent accesses somehow)
+func backdateTree(n inode, modTime time.Time) {
+ eachNode(n, func(fn *filenode) {
+ fn.fileinfo.modTime = modTime
+ }, func(dn *dirnode) {
+ dn.fileinfo.modTime = modTime
+ })
+}
+
+// Approximate portion of signature TTL remaining, usually between 0
+// and 1, or negative if some signatures have expired.
+func (fs *collectionFileSystem) signatureTimeLeft() (float64, time.Duration) {
+ var (
+ now = time.Now()
+ earliest = now.Add(time.Hour * 24 * 7 * 365)
+ latest time.Time
+ )
+ fs.fileSystem.root.RLock()
+ eachNode(fs.root, func(fn *filenode) {
+ fn.Lock()
+ defer fn.Unlock()
+ for _, seg := range fn.segments {
+ seg, ok := seg.(storedSegment)
+ if !ok {
+ continue
+ }
+ expiryTime, err := signatureExpiryTime(seg.locator)
+ if err != nil {
+ continue
+ }
+ if expiryTime.Before(earliest) {
+ earliest = expiryTime
+ }
+ if expiryTime.After(latest) {
+ latest = expiryTime
+ }
+ }
+ }, nil)
+ fs.fileSystem.root.RUnlock()
+
+ if latest.IsZero() {
+ // No signatures == 100% of TTL remaining.
+ return 1, 1
+ }
+
+ ttl := latest.Sub(now)
+ fs.fileSystem.root.Lock()
+ {
+ if ttl > fs.guessSignatureTTL {
+ // ttl is closer to the real TTL than
+ // guessSignatureTTL.
+ fs.guessSignatureTTL = ttl
+ } else {
+ // Use the previous best guess to compute the
+ // portion remaining (below, after unlocking
+ // mutex).
+ ttl = fs.guessSignatureTTL
+ }
+ }
+ fs.fileSystem.root.Unlock()
+
+ return earliest.Sub(now).Seconds() / ttl.Seconds(), ttl
+}
+
+func (fs *collectionFileSystem) updateSignatures(newmanifest string) {
+ newLoc := map[string]string{}
+ for _, tok := range regexp.MustCompile(`\S+`).FindAllString(newmanifest, -1) {
+ if mBlkRe.MatchString(tok) {
+ newLoc[stripAllHints(tok)] = tok
+ }
+ }
+ fs.fileSystem.root.Lock()
+ defer fs.fileSystem.root.Unlock()
+ eachNode(fs.root, func(fn *filenode) {
+ fn.Lock()
+ defer fn.Unlock()
+ for idx, seg := range fn.segments {
+ seg, ok := seg.(storedSegment)
+ if !ok {
+ continue
+ }
+ loc, ok := newLoc[stripAllHints(seg.locator)]
+ if !ok {
+ continue
+ }
+ seg.locator = loc
+ fn.segments[idx] = seg
+ }
+ }, nil)
+}
+
func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
if name == "" || name == "." || name == ".." {
return nil, ErrInvalidArgument
return ErrInvalidOperation
}
+// Check for and incorporate upstream changes. If force==false, this
+// is a no-op except once every ttl/100 or so.
+//
+// Return value is true if new content was loaded from upstream and
+// any unsaved local changes have been discarded.
+func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
+ if fs.uuid == "" && fs.loadedPDH.Load() == "" {
+ return false, nil
+ }
+
+ fs.lockCheckChanges.Lock()
+ if !force && fs.holdCheckChanges.After(time.Now()) {
+ fs.lockCheckChanges.Unlock()
+ return false, nil
+ }
+ remain, ttl := fs.signatureTimeLeft()
+ if remain > 0.01 {
+ fs.holdCheckChanges = time.Now().Add(ttl / 100)
+ }
+ fs.lockCheckChanges.Unlock()
+
+ if !force && remain >= 0.5 {
+ // plenty of time left on current signatures
+ return false, nil
+ }
+
+ loadedPDH, _ := fs.loadedPDH.Load().(string)
+ getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
+ if fs.uuid != "" {
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
+ if err != nil {
+ return false, err
+ }
+ if coll.PortableDataHash != loadedPDH {
+ // collection has changed upstream since we
+ // last loaded or saved. Refresh local data,
+ // losing any unsaved local changes.
+ newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
+ if err != nil {
+ return false, err
+ }
+ snap, err := Snapshot(newfs, "/")
+ if err != nil {
+ return false, err
+ }
+ err = Splice(fs, "/", snap)
+ if err != nil {
+ return false, err
+ }
+ fs.loadedPDH.Store(coll.PortableDataHash)
+ fs.savedPDH.Store(newfs.(*collectionFileSystem).savedPDH.Load())
+ return true, nil
+ }
+ fs.updateSignatures(coll.ManifestText)
+ return false, nil
+ }
+ if loadedPDH != "" {
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+loadedPDH, nil, getparams)
+ if err != nil {
+ return false, err
+ }
+ fs.updateSignatures(coll.ManifestText)
+ }
+ return false, nil
+}
+
+// Refresh signature on a single locator, if necessary. Assume caller
+// has lock. If an update is needed, and there are any storedSegments
+// whose signatures can be updated, start a background task to update
+// them asynchronously when the caller releases locks.
+func (fs *collectionFileSystem) refreshSignature(locator string) string {
+ exp, err := signatureExpiryTime(locator)
+ if err != nil || exp.Sub(time.Now()) > time.Minute {
+ // Synchronous update is not needed. Start an
+ // asynchronous update if needed.
+ go fs.checkChangesOnServer(false)
+ return locator
+ }
+ loadedPDH, _ := fs.loadedPDH.Load().(string)
+ var manifests string
+ for _, id := range []string{fs.uuid, loadedPDH} {
+ if id == "" {
+ continue
+ }
+ var coll Collection
+ err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
+ if err != nil {
+ continue
+ }
+ manifests += coll.ManifestText
+ }
+ hash := stripAllHints(locator)
+ for _, tok := range regexp.MustCompile(`\S+`).FindAllString(manifests, -1) {
+ if mBlkRe.MatchString(tok) {
+ if stripAllHints(tok) == hash {
+ locator = tok
+ break
+ }
+ }
+ }
+ go fs.updateSignatures(manifests)
+ return locator
+}
+
func (fs *collectionFileSystem) Sync() error {
- if fs.uuid == "" {
+ refreshed, err := fs.checkChangesOnServer(true)
+ if err != nil {
+ return err
+ }
+ if refreshed || fs.uuid == "" {
return nil
}
txt, err := fs.MarshalManifest(".")
if err != nil {
return fmt.Errorf("sync failed: %s", err)
}
- coll := &Collection{
+ savingPDH := PortableDataHash(txt)
+ if savingPDH == fs.savedPDH.Load() {
+ // No local changes since last save or initial load.
+ return nil
+ }
+ coll := Collection{
UUID: fs.uuid,
ManifestText: txt,
}
- err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
+
+ selectFields := []string{"uuid", "portable_data_hash"}
+ fs.lockCheckChanges.Lock()
+ remain, _ := fs.signatureTimeLeft()
+ fs.lockCheckChanges.Unlock()
+ if remain < 0.5 {
+ selectFields = append(selectFields, "manifest_text")
+ }
+
+ err = fs.RequestAndDecode(&coll, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
"collection": map[string]string{
"manifest_text": coll.ManifestText,
},
- "select": []string{"uuid"},
+ "select": selectFields,
})
if err != nil {
- return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
+ return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err)
}
+ fs.updateSignatures(coll.ManifestText)
+ fs.loadedPDH.Store(coll.PortableDataHash)
+ fs.savedPDH.Store(savingPDH)
return nil
}
}
func (fs *collectionFileSystem) MemorySize() int64 {
- fs.fileSystem.root.Lock()
- defer fs.fileSystem.root.Unlock()
return fs.fileSystem.root.(*dirnode).MemorySize()
}
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
+ return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, true)
}
func (fs *collectionFileSystem) Size() int64 {
return fs.fileSystem.root.(*dirnode).TreeSize()
}
+func (fs *collectionFileSystem) Snapshot() (inode, error) {
+ return fs.fileSystem.root.Snapshot()
+}
+
+func (fs *collectionFileSystem) Splice(r inode) error {
+ return fs.fileSystem.root.Splice(r)
+}
+
// filenodePtr is an offset into a file that is (usually) efficient to
// seek to. Specifically, if filenode.repacked==filenodePtr.repacked
// then
//
// After seeking:
//
-// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
-// ||
-// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
+// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
+// ||
+// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
ptr = startPtr
if ptr.off < 0 {
// filenode implements inode.
type filenode struct {
parent inode
- fs FileSystem
+ fs *collectionFileSystem
fileinfo fileinfo
segments []segment
// number of times `segments` has changed in a
return fn.fs
}
+func (fn *filenode) MemorySize() (size int64) {
+ fn.RLock()
+ defer fn.RUnlock()
+ size = 64
+ for _, seg := range fn.segments {
+ size += seg.memorySize()
+ }
+ return
+}
+
// Read reads file data from a single segment, starting at startPtr,
// into p. startPtr is assumed not to be up-to-date. Caller must have
// RLock or Lock.
err = io.EOF
return
}
+ if ss, ok := fn.segments[ptr.segmentIdx].(storedSegment); ok {
+ ss.locator = fn.fs.refreshSignature(ss.locator)
+ fn.segments[ptr.segmentIdx] = ss
+ }
n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
if n > 0 {
ptr.off += int64(n)
fn.fs.throttle().Acquire()
go func() {
defer close(done)
- locator, _, err := fn.FS().PutB(buf)
+ resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{
+ Data: buf,
+ Replicas: fn.fs.replicas,
+ StorageClasses: fn.fs.storageClasses,
+ })
fn.fs.throttle().Release()
fn.Lock()
defer fn.Unlock()
fn.memsize -= int64(len(buf))
fn.segments[idx] = storedSegment{
kc: fn.FS(),
- locator: locator,
+ locator: resp.Locator,
size: len(buf),
offset: 0,
length: len(buf),
}
}
+func (fn *filenode) Snapshot() (inode, error) {
+ fn.RLock()
+ defer fn.RUnlock()
+ segments := make([]segment, 0, len(fn.segments))
+ for _, seg := range fn.segments {
+ segments = append(segments, seg.Slice(0, seg.Len()))
+ }
+ return &filenode{
+ fileinfo: fn.fileinfo,
+ segments: segments,
+ }, nil
+}
+
+func (fn *filenode) Splice(repl inode) error {
+ repl, err := repl.Snapshot()
+ if err != nil {
+ return err
+ }
+ fn.parent.Lock()
+ defer fn.parent.Unlock()
+ fn.Lock()
+ defer fn.Unlock()
+ _, err = fn.parent.Child(fn.fileinfo.name, func(inode) (inode, error) { return repl, nil })
+ if err != nil {
+ return err
+ }
+ switch repl := repl.(type) {
+ case *dirnode:
+ repl.parent = fn.parent
+ repl.fileinfo.name = fn.fileinfo.name
+ repl.setTreeFS(fn.fs)
+ case *filenode:
+ repl.parent = fn.parent
+ repl.fileinfo.name = fn.fileinfo.name
+ repl.fs = fn.fs
+ default:
+ return fmt.Errorf("cannot splice snapshot containing %T: %w", repl, ErrInvalidArgument)
+ }
+ return nil
+}
+
type dirnode struct {
fs *collectionFileSystem
treenode
go func() {
defer close(done)
defer close(errs)
- locator, _, err := dn.fs.PutB(block)
+ resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{
+ Data: block,
+ Replicas: dn.fs.replicas,
+ StorageClasses: dn.fs.storageClasses,
+ })
dn.fs.throttle().Release()
if err != nil {
errs <- err
data := ref.fn.segments[ref.idx].(*memSegment).buf
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
- locator: locator,
+ locator: resp.Locator,
size: blocksize,
offset: offsets[idx],
length: len(data),
return cg.Wait()
}
-// caller must have write lock.
func (dn *dirnode) MemorySize() (size int64) {
- for _, name := range dn.sortedNames() {
- node := dn.inodes[name]
- node.Lock()
- defer node.Unlock()
- switch node := node.(type) {
- case *dirnode:
- size += node.MemorySize()
- case *filenode:
- for _, seg := range node.segments {
- switch seg := seg.(type) {
- case *memSegment:
- size += int64(seg.Len())
- }
- }
- }
+ dn.RLock()
+ todo := make([]inode, 0, len(dn.inodes))
+ for _, node := range dn.inodes {
+ todo = append(todo, node)
+ }
+ dn.RUnlock()
+ size = 64
+ for _, node := range todo {
+ size += node.MemorySize()
}
return
}
}
// caller must have write lock.
-func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, flush bool) (string, error) {
cg := newContextGroup(ctx)
defer cg.Cancel()
for i, name := range dirnames {
i, name := i, name
cg.Go(func() error {
- txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
+ txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, flush)
subdirs[i] = txt
return err
})
var fileparts []filepart
var blocks []string
- if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
+ if !flush {
+ // skip flush -- will fail below if anything
+ // needed flushing
+ } else if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
return err
}
for _, name := range filenames {
}
streamLen += int64(seg.size)
default:
- // This can't happen: we
- // haven't unlocked since
+ // We haven't unlocked since
// calling flush(sync=true).
- panic(fmt.Sprintf("can't marshal segment type %T", seg))
+ // Evidently the caller passed
+ // flush==false but there were
+ // local changes.
+ return fmt.Errorf("can't marshal segment type %T", seg)
}
}
}
}
func (dn *dirnode) loadManifest(txt string) error {
- var dirname string
- streams := strings.Split(txt, "\n")
- if streams[len(streams)-1] != "" {
+ streams := bytes.Split([]byte(txt), []byte{'\n'})
+ if len(streams[len(streams)-1]) != 0 {
return fmt.Errorf("line %d: no trailing newline", len(streams))
}
streams = streams[:len(streams)-1]
segments := []storedSegment{}
+ // To reduce allocs, we reuse a single "pathparts" slice
+ // (pre-split on "/" separators) for the duration of this
+ // func.
+ var pathparts []string
+ // To reduce allocs, we reuse a single "toks" slice of 3 byte
+ // slices.
+ var toks = make([][]byte, 3)
+ // Similar to bytes.SplitN(token, []byte{c}, 3), but splits
+ // into the toks slice rather than allocating a new one, and
+ // returns the number of toks (1, 2, or 3).
+ splitToToks := func(src []byte, c rune) int {
+ c1 := bytes.IndexRune(src, c)
+ if c1 < 0 {
+ toks[0] = src
+ return 1
+ }
+ toks[0], src = src[:c1], src[c1+1:]
+ c2 := bytes.IndexRune(src, c)
+ if c2 < 0 {
+ toks[1] = src
+ return 2
+ }
+ toks[1], toks[2] = src[:c2], src[c2+1:]
+ return 3
+ }
for i, stream := range streams {
lineno := i + 1
var anyFileTokens bool
var pos int64
var segIdx int
segments = segments[:0]
- for i, token := range strings.Split(stream, " ") {
+ pathparts = nil
+ streamparts := 0
+ for i, token := range bytes.Split(stream, []byte{' '}) {
if i == 0 {
- dirname = manifestUnescape(token)
+ pathparts = strings.Split(manifestUnescape(string(token)), "/")
+ streamparts = len(pathparts)
continue
}
- if !strings.Contains(token, ":") {
+ if !bytes.ContainsRune(token, ':') {
if anyFileTokens {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- toks := strings.SplitN(token, "+", 3)
- if len(toks) < 2 {
+ if splitToToks(token, '+') < 2 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
- length, err := strconv.ParseInt(toks[1], 10, 32)
+ length, err := strconv.ParseInt(string(toks[1]), 10, 32)
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
segments = append(segments, storedSegment{
- locator: token,
+ locator: string(token),
size: int(length),
offset: 0,
length: int(length),
} else if len(segments) == 0 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
-
- toks := strings.SplitN(token, ":", 3)
- if len(toks) != 3 {
+ if splitToToks(token, ':') != 3 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
anyFileTokens = true
- offset, err := strconv.ParseInt(toks[0], 10, 64)
+ offset, err := strconv.ParseInt(string(toks[0]), 10, 64)
if err != nil || offset < 0 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- length, err := strconv.ParseInt(toks[1], 10, 64)
+ length, err := strconv.ParseInt(string(toks[1]), 10, 64)
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- name := dirname + "/" + manifestUnescape(toks[2])
- fnode, err := dn.createFileAndParents(name)
+ if !bytes.ContainsAny(toks[2], `\/`) {
+ // optimization for a common case
+ pathparts = append(pathparts[:streamparts], string(toks[2]))
+ } else {
+ pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
+ }
+ fnode, err := dn.createFileAndParents(pathparts)
if fnode == nil && err == nil && length == 0 {
// Special case: an empty file used as
// a marker to preserve an otherwise
continue
}
if err != nil || (fnode == nil && length != 0) {
- return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
+ return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
}
// Map the stream offset/range coordinates to
// block/offset/range coordinates and add
return fmt.Errorf("line %d: no file segments", lineno)
} else if len(segments) == 0 {
return fmt.Errorf("line %d: no locators", lineno)
- } else if dirname == "" {
+ } else if streamparts == 0 {
return fmt.Errorf("line %d: no stream name", lineno)
}
}
//
// If path is a "parent directory exists" marker (the last path
// component is "."), the returned values are both nil.
-func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
+//
+// Newly added nodes have modtime==0. Caller is responsible for fixing
+// them with backdateTree.
+func (dn *dirnode) createFileAndParents(names []string) (fn *filenode, err error) {
var node inode = dn
- names := strings.Split(path, "/")
basename := names[len(names)-1]
for _, name := range names[:len(names)-1] {
switch name {
node = node.Parent()
continue
}
- modtime := node.Parent().FileInfo().ModTime()
node.Lock()
- locked := node
+ unlock := node.Unlock
node, err = node.Child(name, func(child inode) (inode, error) {
if child == nil {
- child, err := node.FS().newNode(name, 0755|os.ModeDir, modtime)
+ // note modtime will be fixed later in backdateTree()
+ child, err := node.FS().newNode(name, 0755|os.ModeDir, time.Time{})
if err != nil {
return nil, err
}
return child, nil
}
})
- locked.Unlock()
+ unlock()
if err != nil {
return
}
if basename == "." {
return
} else if !permittedName(basename) {
- err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+ err = fmt.Errorf("invalid file part %q in path %q", basename, names)
return
}
- modtime := node.FileInfo().ModTime()
node.Lock()
defer node.Unlock()
_, err = node.Child(basename, func(child inode) (inode, error) {
switch child := child.(type) {
case nil:
- child, err = node.FS().newNode(basename, 0755, modtime)
+ child, err = node.FS().newNode(basename, 0755, time.Time{})
if err != nil {
return nil, err
}
return
}
+func (dn *dirnode) Snapshot() (inode, error) {
+ return dn.snapshot()
+}
+
+func (dn *dirnode) snapshot() (*dirnode, error) {
+ dn.RLock()
+ defer dn.RUnlock()
+ snap := &dirnode{
+ treenode: treenode{
+ inodes: make(map[string]inode, len(dn.inodes)),
+ fileinfo: dn.fileinfo,
+ },
+ }
+ for name, child := range dn.inodes {
+ dupchild, err := child.Snapshot()
+ if err != nil {
+ return nil, err
+ }
+ snap.inodes[name] = dupchild
+ dupchild.SetParent(snap, name)
+ }
+ return snap, nil
+}
+
+func (dn *dirnode) Splice(repl inode) error {
+ repl, err := repl.Snapshot()
+ if err != nil {
+ return fmt.Errorf("cannot copy snapshot: %w", err)
+ }
+ switch repl := repl.(type) {
+ default:
+ return fmt.Errorf("cannot splice snapshot containing %T: %w", repl, ErrInvalidArgument)
+ case *dirnode:
+ dn.Lock()
+ defer dn.Unlock()
+ dn.inodes = repl.inodes
+ dn.setTreeFS(dn.fs)
+ case *filenode:
+ dn.parent.Lock()
+ defer dn.parent.Unlock()
+ removing, err := dn.parent.Child(dn.fileinfo.name, nil)
+ if err != nil {
+ return fmt.Errorf("cannot use Splice to replace a top-level directory with a file: %w", ErrInvalidOperation)
+ } else if removing != dn {
+ // If ../thisdirname is not this dirnode, it
+ // must be an inode that wraps a dirnode, like
+ // a collectionFileSystem or deferrednode.
+ if deferred, ok := removing.(*deferrednode); ok {
+ // More useful to report the type of
+ // the wrapped node rather than just
+ // *deferrednode. (We know the real
+ // inode is already loaded because dn
+ // is inside it.)
+ removing = deferred.realinode()
+ }
+ return fmt.Errorf("cannot use Splice to attach a file at top level of %T: %w", removing, ErrInvalidOperation)
+ }
+ dn.Lock()
+ defer dn.Unlock()
+ _, err = dn.parent.Child(dn.fileinfo.name, func(inode) (inode, error) { return repl, nil })
+ if err != nil {
+ return fmt.Errorf("error replacing filenode: dn.parent.Child(): %w", err)
+ }
+ repl.fs = dn.fs
+ }
+ return nil
+}
+
+func (dn *dirnode) setTreeFS(fs *collectionFileSystem) {
+ dn.fs = fs
+ for _, child := range dn.inodes {
+ switch child := child.(type) {
+ case *dirnode:
+ child.setTreeFS(fs)
+ case *filenode:
+ child.fs = fs
+ }
+ }
+}
+
type segment interface {
io.ReaderAt
Len() int
// Return a new segment with a subsection of the data from this
// one. length<0 means length=Len()-off.
Slice(off int, length int) segment
+ memorySize() int64
}
type memSegment struct {
return
}
+func (me *memSegment) memorySize() int64 {
+ return 64 + int64(len(me.buf))
+}
+
type storedSegment struct {
kc fsBackend
locator string
return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
}
+func (se storedSegment) memorySize() int64 {
+ return 64 + int64(len(se.locator))
+}
+
func canonicalName(name string) string {
name = path.Clean("/" + name)
if name == "/" || name == "./" {