Merge branch '21258-flaky-adc-test'
[arvados.git] / sdk / go / arvados / fs_base.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "io/fs"
12         "log"
13         "net/http"
14         "os"
15         "path"
16         "path/filepath"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 var (
23         ErrReadOnlyFile      = errors.New("read-only file")
24         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
25         ErrFileExists        = errors.New("file exists")
26         ErrInvalidOperation  = errors.New("invalid operation")
27         ErrInvalidArgument   = errors.New("invalid argument")
28         ErrDirectoryNotEmpty = errors.New("directory not empty")
29         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
30         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
31         ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
32         ErrNotADirectory     = errors.New("not a directory")
33         ErrPermission        = os.ErrPermission
34         DebugLocksPanicMode  = false
35 )
36
37 type syncer interface {
38         Sync() error
39 }
40
41 func debugPanicIfNotLocked(l sync.Locker, writing bool) {
42         if !DebugLocksPanicMode {
43                 return
44         }
45         race := false
46         if rl, ok := l.(interface {
47                 RLock()
48                 RUnlock()
49         }); ok && writing {
50                 go func() {
51                         // Fail if we can grab the read lock during an
52                         // operation that purportedly has write lock.
53                         rl.RLock()
54                         race = true
55                         rl.RUnlock()
56                 }()
57         } else {
58                 go func() {
59                         l.Lock()
60                         race = true
61                         l.Unlock()
62                 }()
63         }
64         time.Sleep(100)
65         if race {
66                 panic("bug: caller-must-have-lock func called, but nobody has lock")
67         }
68 }
69
70 // A File is an *os.File-like interface for reading and writing files
71 // in a FileSystem.
72 type File interface {
73         io.Reader
74         io.Writer
75         io.Closer
76         io.Seeker
77         Size() int64
78         Readdir(int) ([]os.FileInfo, error)
79         Stat() (os.FileInfo, error)
80         Truncate(int64) error
81         Sync() error
82         // Create a snapshot of a file or directory tree, which can
83         // then be spliced onto a different path or a different
84         // collection.
85         Snapshot() (*Subtree, error)
86         // Replace this file or directory with the given snapshot.
87         // The target must be inside a collection: Splice returns an
88         // error if the File is a virtual file or directory like
89         // by_id, a project directory, .arvados#collection,
90         // etc. Splice can replace directories with regular files and
91         // vice versa, except it cannot replace the root directory of
92         // a collection with a regular file.
93         Splice(snapshot *Subtree) error
94 }
95
96 // A Subtree is a detached part of a filesystem tree that can be
97 // spliced into a filesystem via (File)Splice().
98 type Subtree struct {
99         inode inode
100 }
101
102 // A FileSystem is an http.Filesystem plus Stat() and support for
103 // opening writable files. All methods are safe to call from multiple
104 // goroutines.
105 type FileSystem interface {
106         http.FileSystem
107         fsBackend
108
109         rootnode() inode
110
111         // filesystem-wide lock: used by Rename() to prevent deadlock
112         // while locking multiple inodes.
113         locker() sync.Locker
114
115         // throttle for limiting concurrent background writers
116         throttle() *throttle
117
118         // create a new node with nil parent.
119         newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
120
121         // analogous to os.Stat()
122         Stat(name string) (os.FileInfo, error)
123
124         // analogous to os.Create(): create/truncate a file and open it O_RDWR.
125         Create(name string) (File, error)
126
127         // Like os.OpenFile(): create or open a file or directory.
128         //
129         // If flag&os.O_EXCL==0, it opens an existing file or
130         // directory if one exists. If flag&os.O_CREATE!=0, it creates
131         // a new empty file or directory if one does not already
132         // exist.
133         //
134         // When creating a new item, perm&os.ModeDir determines
135         // whether it is a file or a directory.
136         //
137         // A file can be opened multiple times and used concurrently
138         // from multiple goroutines. However, each File object should
139         // be used by only one goroutine at a time.
140         OpenFile(name string, flag int, perm os.FileMode) (File, error)
141
142         Mkdir(name string, perm os.FileMode) error
143         Remove(name string) error
144         RemoveAll(name string) error
145         Rename(oldname, newname string) error
146
147         // Write buffered data from memory to storage, returning when
148         // all updates have been saved to persistent storage.
149         Sync() error
150
151         // Write buffered data from memory to storage, but don't wait
152         // for all writes to finish before returning. If shortBlocks
153         // is true, flush everything; otherwise, if there's less than
154         // a full block of buffered data at the end of a stream, leave
155         // it buffered in memory in case more data can be appended. If
156         // path is "", flush all dirs/streams; otherwise, flush only
157         // the specified dir/stream.
158         Flush(path string, shortBlocks bool) error
159
160         // Estimate current memory usage.
161         MemorySize() int64
162 }
163
164 type fsFS struct {
165         FileSystem
166 }
167
168 // FS returns an fs.FS interface to the given FileSystem, to enable
169 // the use of fs.WalkDir, etc.
170 func FS(fs FileSystem) fs.FS { return fsFS{fs} }
171 func (fs fsFS) Open(path string) (fs.File, error) {
172         f, err := fs.FileSystem.Open(path)
173         return f, err
174 }
175
176 type inode interface {
177         SetParent(parent inode, name string)
178         Parent() inode
179         FS() FileSystem
180         Read([]byte, filenodePtr) (int, filenodePtr, error)
181         Write([]byte, filenodePtr) (int, filenodePtr, error)
182         Truncate(int64) error
183         IsDir() bool
184         Readdir() ([]os.FileInfo, error)
185         Size() int64
186         FileInfo() os.FileInfo
187         // Create a snapshot of this node and its descendants.
188         Snapshot() (inode, error)
189         // Replace this node with a copy of the provided snapshot.
190         // Caller may provide the same snapshot to multiple Splice
191         // calls, but must not modify the snapshot concurrently.
192         Splice(inode) error
193
194         // Child() performs lookups and updates of named child nodes.
195         //
196         // (The term "child" here is used strictly. This means name is
197         // not "." or "..", and name does not contain "/".)
198         //
199         // If replace is non-nil, Child calls replace(x) where x is
200         // the current child inode with the given name. If possible,
201         // the child inode is replaced with the one returned by
202         // replace().
203         //
204         // If replace(x) returns an inode (besides x or nil) that is
205         // subsequently returned by Child(), then Child()'s caller
206         // must ensure the new child's name and parent are set/updated
207         // to Child()'s name argument and its receiver respectively.
208         // This is not necessarily done before replace(x) returns, but
209         // it must be done before Child()'s caller releases the
210         // parent's lock.
211         //
212         // Nil represents "no child". replace(nil) signifies that no
213         // child with this name exists yet. If replace() returns nil,
214         // the existing child should be deleted if possible.
215         //
216         // An implementation of Child() is permitted to ignore
217         // replace() or its return value. For example, a regular file
218         // inode does not have children, so Child() always returns
219         // nil.
220         //
221         // Child() returns the child, if any, with the given name: if
222         // a child was added or changed, the new child is returned.
223         //
224         // Caller must have lock (or rlock if replace is nil).
225         Child(name string, replace func(inode) (inode, error)) (inode, error)
226
227         sync.Locker
228         RLock()
229         RUnlock()
230         MemorySize() int64
231 }
232
233 type fileinfo struct {
234         name    string
235         mode    os.FileMode
236         size    int64
237         modTime time.Time
238         // If not nil, sys() returns the source data structure, which
239         // can be a *Collection, *Group, or nil. Currently populated
240         // only for project dirs and top-level collection dirs. Does
241         // not stay up to date with upstream changes.
242         //
243         // Intended to support keep-web's properties-as-s3-metadata
244         // feature (https://dev.arvados.org/issues/19088).
245         sys func() interface{}
246 }
247
248 // Name implements os.FileInfo.
249 func (fi fileinfo) Name() string {
250         return fi.name
251 }
252
253 // ModTime implements os.FileInfo.
254 func (fi fileinfo) ModTime() time.Time {
255         return fi.modTime
256 }
257
258 // Mode implements os.FileInfo.
259 func (fi fileinfo) Mode() os.FileMode {
260         return fi.mode
261 }
262
263 // IsDir implements os.FileInfo.
264 func (fi fileinfo) IsDir() bool {
265         return fi.mode&os.ModeDir != 0
266 }
267
268 // Size implements os.FileInfo.
269 func (fi fileinfo) Size() int64 {
270         return fi.size
271 }
272
273 // Sys implements os.FileInfo. See comment in fileinfo struct.
274 func (fi fileinfo) Sys() interface{} {
275         if fi.sys == nil {
276                 return nil
277         }
278         return fi.sys()
279 }
280
281 type nullnode struct{}
282
283 func (*nullnode) Mkdir(string, os.FileMode) error {
284         return ErrInvalidOperation
285 }
286
287 func (*nullnode) Read([]byte, filenodePtr) (int, filenodePtr, error) {
288         return 0, filenodePtr{}, ErrInvalidOperation
289 }
290
291 func (*nullnode) Write([]byte, filenodePtr) (int, filenodePtr, error) {
292         return 0, filenodePtr{}, ErrInvalidOperation
293 }
294
295 func (*nullnode) Truncate(int64) error {
296         return ErrInvalidOperation
297 }
298
299 func (*nullnode) FileInfo() os.FileInfo {
300         return fileinfo{}
301 }
302
303 func (*nullnode) IsDir() bool {
304         return false
305 }
306
307 func (*nullnode) Readdir() ([]os.FileInfo, error) {
308         return nil, ErrInvalidOperation
309 }
310
311 func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
312         return nil, ErrNotADirectory
313 }
314
315 func (*nullnode) MemorySize() int64 {
316         // Types that embed nullnode should report their own size, but
317         // if they don't, we at least report a non-zero size to ensure
318         // a large tree doesn't get reported as 0 bytes.
319         return 64
320 }
321
322 func (*nullnode) Snapshot() (inode, error) {
323         return nil, ErrInvalidOperation
324 }
325
326 func (*nullnode) Splice(inode) error {
327         return ErrInvalidOperation
328 }
329
330 type treenode struct {
331         fs       FileSystem
332         parent   inode
333         inodes   map[string]inode
334         fileinfo fileinfo
335         sync.RWMutex
336         nullnode
337 }
338
339 func (n *treenode) FS() FileSystem {
340         return n.fs
341 }
342
343 func (n *treenode) SetParent(p inode, name string) {
344         n.Lock()
345         defer n.Unlock()
346         n.parent = p
347         n.fileinfo.name = name
348 }
349
350 func (n *treenode) Parent() inode {
351         n.RLock()
352         defer n.RUnlock()
353         return n.parent
354 }
355
356 func (n *treenode) IsDir() bool {
357         return true
358 }
359
360 func (n *treenode) Child(name string, replace func(inode) (inode, error)) (child inode, err error) {
361         debugPanicIfNotLocked(n, false)
362         child = n.inodes[name]
363         if name == "" || name == "." || name == ".." {
364                 err = ErrInvalidArgument
365                 return
366         }
367         if replace == nil {
368                 return
369         }
370         newchild, err := replace(child)
371         if err != nil {
372                 return
373         }
374         if newchild == nil {
375                 debugPanicIfNotLocked(n, true)
376                 delete(n.inodes, name)
377         } else if newchild != child {
378                 debugPanicIfNotLocked(n, true)
379                 n.inodes[name] = newchild
380                 n.fileinfo.modTime = time.Now()
381                 child = newchild
382         }
383         return
384 }
385
386 func (n *treenode) Size() int64 {
387         return n.FileInfo().Size()
388 }
389
390 func (n *treenode) FileInfo() os.FileInfo {
391         n.RLock()
392         defer n.RUnlock()
393         fi := n.fileinfo
394         fi.size = int64(len(n.inodes))
395         return fi
396 }
397
398 func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
399         // We need RLock to safely read n.inodes, but we must release
400         // it before calling FileInfo() on the child nodes. Otherwise,
401         // we risk deadlock when filter groups A and B match each
402         // other, concurrent Readdir() calls try to RLock them in
403         // opposite orders, and one cannot be RLocked a second time
404         // because a third caller is waiting for a write lock.
405         n.RLock()
406         inodes := make([]inode, 0, len(n.inodes))
407         for _, inode := range n.inodes {
408                 inodes = append(inodes, inode)
409         }
410         n.RUnlock()
411         fi = make([]os.FileInfo, 0, len(inodes))
412         for _, inode := range inodes {
413                 fi = append(fi, inode.FileInfo())
414         }
415         return
416 }
417
418 func (n *treenode) Sync() error {
419         n.RLock()
420         defer n.RUnlock()
421         for _, inode := range n.inodes {
422                 syncer, ok := inode.(syncer)
423                 if !ok {
424                         return ErrInvalidOperation
425                 }
426                 err := syncer.Sync()
427                 if err != nil {
428                         return err
429                 }
430         }
431         return nil
432 }
433
434 func (n *treenode) MemorySize() (size int64) {
435         // To avoid making other callers wait while we count the
436         // entire filesystem size, we lock the node only long enough
437         // to copy the list of children. We accept that the resulting
438         // size will sometimes be misleading (e.g., we will
439         // double-count an item that moves from A to B after we check
440         // A's size but before we check B's size).
441         n.RLock()
442         debugPanicIfNotLocked(n, false)
443         todo := make([]inode, 0, len(n.inodes))
444         for _, inode := range n.inodes {
445                 todo = append(todo, inode)
446         }
447         n.RUnlock()
448         for _, inode := range todo {
449                 size += inode.MemorySize()
450         }
451         return 64 + size
452 }
453
454 type fileSystem struct {
455         root inode
456         fsBackend
457         mutex sync.Mutex
458         thr   *throttle
459 }
460
461 func (fs *fileSystem) rootnode() inode {
462         return fs.root
463 }
464
465 func (fs *fileSystem) throttle() *throttle {
466         return fs.thr
467 }
468
469 func (fs *fileSystem) locker() sync.Locker {
470         return &fs.mutex
471 }
472
473 // OpenFile is analogous to os.OpenFile().
474 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
475         return fs.openFile(name, flag, perm)
476 }
477
478 func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
479         if flag&os.O_SYNC != 0 {
480                 return nil, ErrSyncNotSupported
481         }
482         dirname, name := path.Split(name)
483         ancestors := map[inode]bool{}
484         parent, err := rlookup(fs.root, dirname, ancestors)
485         if err != nil {
486                 return nil, err
487         }
488         var readable, writable bool
489         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
490         case os.O_RDWR:
491                 readable = true
492                 writable = true
493         case os.O_RDONLY:
494                 readable = true
495         case os.O_WRONLY:
496                 writable = true
497         default:
498                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
499         }
500         if parent.IsDir() {
501                 // A directory can be opened via "foo/", "foo/.", or
502                 // "foo/..".
503                 switch name {
504                 case ".", "":
505                         return &filehandle{inode: parent, readable: readable, writable: writable}, nil
506                 case "..":
507                         return &filehandle{inode: parent.Parent(), readable: readable, writable: writable}, nil
508                 }
509         }
510         createMode := flag&os.O_CREATE != 0
511         // We always need to take Lock() here, not just RLock(). Even
512         // if we know we won't be creating a file, parent might be a
513         // lookupnode, which sometimes populates its inodes map during
514         // a Child() call.
515         parent.Lock()
516         defer parent.Unlock()
517         n, err := parent.Child(name, nil)
518         if err != nil {
519                 return nil, err
520         } else if n == nil {
521                 if !createMode {
522                         return nil, os.ErrNotExist
523                 }
524                 n, err = parent.Child(name, func(inode) (repl inode, err error) {
525                         repl, err = parent.FS().newNode(name, perm|0755, time.Now())
526                         if err != nil {
527                                 return
528                         }
529                         repl.SetParent(parent, name)
530                         return
531                 })
532                 if err != nil {
533                         return nil, err
534                 } else if n == nil {
535                         // Parent rejected new child, but returned no error
536                         return nil, ErrInvalidArgument
537                 }
538         } else if flag&os.O_EXCL != 0 {
539                 return nil, ErrFileExists
540         } else if flag&os.O_TRUNC != 0 {
541                 if !writable {
542                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
543                 } else if n.IsDir() {
544                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
545                 } else if err := n.Truncate(0); err != nil {
546                         return nil, err
547                 }
548         }
549         // If n and one of its parents/ancestors are [hardlinks to]
550         // the same node (e.g., a filter group that matches itself),
551         // open an "empty directory" node instead, so the inner
552         // hardlink appears empty. This is needed to ensure
553         // Open("a/b/c/x/x").Readdir() appears empty, matching the
554         // behavior of rlookup("a/b/c/x/x/z") => ErrNotExist.
555         if hl, ok := n.(*hardlink); (ok && ancestors[hl.inode]) || ancestors[n] {
556                 n = &treenode{
557                         fs:     n.FS(),
558                         parent: parent,
559                         inodes: nil,
560                         fileinfo: fileinfo{
561                                 name:    name,
562                                 modTime: time.Now(),
563                                 mode:    0555 | os.ModeDir,
564                         },
565                 }
566         }
567         return &filehandle{
568                 inode:    n,
569                 append:   flag&os.O_APPEND != 0,
570                 readable: readable,
571                 writable: writable,
572         }, nil
573 }
574
575 func (fs *fileSystem) Open(name string) (http.File, error) {
576         return fs.OpenFile(name, os.O_RDONLY, 0)
577 }
578
579 func (fs *fileSystem) Create(name string) (File, error) {
580         return fs.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
581 }
582
583 func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
584         dirname, name := path.Split(name)
585         n, err := rlookup(fs.root, dirname, nil)
586         if err != nil {
587                 return err
588         }
589         n.Lock()
590         defer n.Unlock()
591         if child, err := n.Child(name, nil); err != nil {
592                 return err
593         } else if child != nil {
594                 return os.ErrExist
595         }
596
597         _, err = n.Child(name, func(inode) (repl inode, err error) {
598                 repl, err = n.FS().newNode(name, perm|os.ModeDir, time.Now())
599                 if err != nil {
600                         return
601                 }
602                 repl.SetParent(n, name)
603                 return
604         })
605         return err
606 }
607
608 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
609         node, err := rlookup(fs.root, name, nil)
610         if err != nil {
611                 return nil, err
612         }
613         return node.FileInfo(), nil
614 }
615
616 func (fs *fileSystem) Rename(oldname, newname string) error {
617         olddir, oldname := path.Split(oldname)
618         if oldname == "" || oldname == "." || oldname == ".." {
619                 return ErrInvalidArgument
620         }
621         olddirf, err := fs.openFile(olddir+".", os.O_RDONLY, 0)
622         if err != nil {
623                 return fmt.Errorf("%q: %s", olddir, err)
624         }
625         defer olddirf.Close()
626
627         newdir, newname := path.Split(newname)
628         if newname == "." || newname == ".." {
629                 return ErrInvalidArgument
630         } else if newname == "" {
631                 // Rename("a/b", "c/") means Rename("a/b", "c/b")
632                 newname = oldname
633         }
634         newdirf, err := fs.openFile(newdir+".", os.O_RDONLY, 0)
635         if err != nil {
636                 return fmt.Errorf("%q: %s", newdir, err)
637         }
638         defer newdirf.Close()
639
640         // TODO: If the nearest common ancestor ("nca") of olddirf and
641         // newdirf is on a different filesystem than fs, we should
642         // call nca.FS().Rename() instead of proceeding. Until then
643         // it's awkward for filesystems to implement their own Rename
644         // methods effectively: the only one that runs is the one on
645         // the root FileSystem exposed to the caller (webdav, fuse,
646         // etc).
647
648         // When acquiring locks on multiple inodes, avoid deadlock by
649         // locking the entire containing filesystem first.
650         cfs := olddirf.inode.FS()
651         cfs.locker().Lock()
652         defer cfs.locker().Unlock()
653
654         if cfs != newdirf.inode.FS() {
655                 // Moving inodes across filesystems is not (yet)
656                 // supported. Locking inodes from different
657                 // filesystems could deadlock, so we must error out
658                 // now.
659                 return ErrInvalidOperation
660         }
661
662         // To ensure we can test reliably whether we're about to move
663         // a directory into itself, lock all potential common
664         // ancestors of olddir and newdir.
665         needLock := []sync.Locker{}
666         for _, node := range []inode{olddirf.inode, newdirf.inode} {
667                 needLock = append(needLock, node)
668                 for node.Parent() != node && node.Parent().FS() == node.FS() {
669                         node = node.Parent()
670                         needLock = append(needLock, node)
671                 }
672         }
673         locked := map[sync.Locker]bool{}
674         for i := len(needLock) - 1; i >= 0; i-- {
675                 n := needLock[i]
676                 if fs, ok := n.(interface{ rootnode() inode }); ok {
677                         // Lock the fs's root dir directly, not
678                         // through the fs. Otherwise our "locked" map
679                         // would not reliably prevent double-locking
680                         // the fs's root dir.
681                         n = fs.rootnode()
682                 }
683                 if !locked[n] {
684                         n.Lock()
685                         defer n.Unlock()
686                         locked[n] = true
687                 }
688         }
689
690         _, err = olddirf.inode.Child(oldname, func(oldinode inode) (inode, error) {
691                 if oldinode == nil {
692                         return oldinode, os.ErrNotExist
693                 }
694                 if locked[oldinode] {
695                         // oldinode cannot become a descendant of itself.
696                         return oldinode, ErrInvalidArgument
697                 }
698                 if oldinode.FS() != cfs && newdirf.inode != olddirf.inode {
699                         // moving a mount point to a different parent
700                         // is not (yet) supported.
701                         return oldinode, ErrInvalidArgument
702                 }
703                 accepted, err := newdirf.inode.Child(newname, func(existing inode) (inode, error) {
704                         if existing != nil && existing.IsDir() {
705                                 return existing, ErrIsDirectory
706                         }
707                         return oldinode, nil
708                 })
709                 if err != nil {
710                         // Leave oldinode in olddir.
711                         return oldinode, err
712                 }
713                 accepted.SetParent(newdirf.inode, newname)
714                 return nil, nil
715         })
716         return err
717 }
718
719 func (fs *fileSystem) Remove(name string) error {
720         return fs.remove(strings.TrimRight(name, "/"), false)
721 }
722
723 func (fs *fileSystem) RemoveAll(name string) error {
724         err := fs.remove(strings.TrimRight(name, "/"), true)
725         if os.IsNotExist(err) {
726                 // "If the path does not exist, RemoveAll returns
727                 // nil." (see "os" pkg)
728                 err = nil
729         }
730         return err
731 }
732
733 func (fs *fileSystem) remove(name string, recursive bool) error {
734         dirname, name := path.Split(name)
735         if name == "" || name == "." || name == ".." {
736                 return ErrInvalidArgument
737         }
738         dir, err := rlookup(fs.root, dirname, nil)
739         if err != nil {
740                 return err
741         }
742         dir.Lock()
743         defer dir.Unlock()
744         _, err = dir.Child(name, func(node inode) (inode, error) {
745                 if node == nil {
746                         return nil, os.ErrNotExist
747                 }
748                 if !recursive && node.IsDir() && node.Size() > 0 {
749                         return node, ErrDirectoryNotEmpty
750                 }
751                 return nil, nil
752         })
753         return err
754 }
755
756 func (fs *fileSystem) Sync() error {
757         if syncer, ok := fs.root.(syncer); ok {
758                 return syncer.Sync()
759         }
760         return ErrInvalidOperation
761 }
762
763 func (fs *fileSystem) Flush(string, bool) error {
764         log.Printf("TODO: flush fileSystem")
765         return ErrInvalidOperation
766 }
767
768 func (fs *fileSystem) MemorySize() int64 {
769         return fs.root.MemorySize()
770 }
771
772 // rlookup (recursive lookup) returns the inode for the file/directory
773 // with the given name (which may contain "/" separators). If no such
774 // file/directory exists, the returned node is nil.
775 //
776 // The visited map should be either nil or empty. If non-nil, all
777 // nodes and hardlink targets visited by the given path will be added
778 // to it.
779 //
780 // If a cycle is detected, the second occurrence of the offending node
781 // will be replaced by an empty directory. For example, if "x" is a
782 // filter group that matches itself, then rlookup("a/b/c/x") will
783 // return the filter group, and rlookup("a/b/c/x/x") will return an
784 // empty directory.
785 func rlookup(start inode, path string, visited map[inode]bool) (node inode, err error) {
786         if visited == nil {
787                 visited = map[inode]bool{}
788         }
789         node = start
790         // Clean up ./ and ../ and double-slashes, but (unlike
791         // filepath.Clean) retain a trailing slash, because looking up
792         // ".../regularfile/" should fail.
793         trailingSlash := strings.HasSuffix(path, "/")
794         path = filepath.Clean(path)
795         if trailingSlash && path != "/" {
796                 path += "/"
797         }
798         for _, name := range strings.Split(path, "/") {
799                 visited[node] = true
800                 if node.IsDir() {
801                         if name == "." || name == "" {
802                                 continue
803                         }
804                         if name == ".." {
805                                 node = node.Parent()
806                                 continue
807                         }
808                 }
809                 node, err = func() (inode, error) {
810                         node.Lock()
811                         defer node.Unlock()
812                         return node.Child(name, nil)
813                 }()
814                 if node == nil || err != nil {
815                         break
816                 }
817                 checknode := node
818                 if hardlinked, ok := checknode.(*hardlink); ok {
819                         checknode = hardlinked.inode
820                 }
821                 if visited[checknode] {
822                         node = &treenode{
823                                 fs:     node.FS(),
824                                 parent: node.Parent(),
825                                 inodes: nil,
826                                 fileinfo: fileinfo{
827                                         name:    name,
828                                         modTime: time.Now(),
829                                         mode:    0555 | os.ModeDir,
830                                 },
831                         }
832                 } else {
833                         visited[checknode] = true
834                 }
835         }
836         if node == nil && err == nil {
837                 err = os.ErrNotExist
838         }
839         return
840 }
841
842 func permittedName(name string) bool {
843         return name != "" && name != "." && name != ".." && !strings.Contains(name, "/")
844 }
845
846 // Snapshot returns a Subtree that's a copy of the given path. It
847 // returns an error if the path is not inside a collection.
848 func Snapshot(fs FileSystem, path string) (*Subtree, error) {
849         f, err := fs.OpenFile(path, os.O_RDONLY, 0)
850         if err != nil {
851                 return nil, err
852         }
853         defer f.Close()
854         return f.Snapshot()
855 }
856
857 // Splice inserts newsubtree at the indicated target path.
858 //
859 // Splice returns an error if target is not inside a collection.
860 //
861 // Splice returns an error if target is the root of a collection and
862 // newsubtree is a snapshot of a file.
863 func Splice(fs FileSystem, target string, newsubtree *Subtree) error {
864         f, err := fs.OpenFile(target, os.O_WRONLY, 0)
865         if os.IsNotExist(err) {
866                 f, err = fs.OpenFile(target, os.O_CREATE|os.O_WRONLY, 0700)
867         }
868         if err != nil {
869                 return fmt.Errorf("open %s: %w", target, err)
870         }
871         defer f.Close()
872         return f.Splice(newsubtree)
873 }