19620: Remove old "v1" S3 keepstore driver.
[arvados.git] / sdk / go / arvados / fs_base.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "io/fs"
12         "log"
13         "net/http"
14         "os"
15         "path"
16         "strings"
17         "sync"
18         "time"
19 )
20
21 var (
22         ErrReadOnlyFile      = errors.New("read-only file")
23         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
24         ErrFileExists        = errors.New("file exists")
25         ErrInvalidOperation  = errors.New("invalid operation")
26         ErrInvalidArgument   = errors.New("invalid argument")
27         ErrDirectoryNotEmpty = errors.New("directory not empty")
28         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
29         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
30         ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
31         ErrNotADirectory     = errors.New("not a directory")
32         ErrPermission        = os.ErrPermission
33         DebugLocksPanicMode  = false
34 )
35
36 type syncer interface {
37         Sync() error
38 }
39
40 func debugPanicIfNotLocked(l sync.Locker, writing bool) {
41         if !DebugLocksPanicMode {
42                 return
43         }
44         race := false
45         if rl, ok := l.(interface {
46                 RLock()
47                 RUnlock()
48         }); ok && writing {
49                 go func() {
50                         // Fail if we can grab the read lock during an
51                         // operation that purportedly has write lock.
52                         rl.RLock()
53                         race = true
54                         rl.RUnlock()
55                 }()
56         } else {
57                 go func() {
58                         l.Lock()
59                         race = true
60                         l.Unlock()
61                 }()
62         }
63         time.Sleep(100)
64         if race {
65                 panic("bug: caller-must-have-lock func called, but nobody has lock")
66         }
67 }
68
69 // A File is an *os.File-like interface for reading and writing files
70 // in a FileSystem.
71 type File interface {
72         io.Reader
73         io.Writer
74         io.Closer
75         io.Seeker
76         Size() int64
77         Readdir(int) ([]os.FileInfo, error)
78         Stat() (os.FileInfo, error)
79         Truncate(int64) error
80         Sync() error
81         // Create a snapshot of a file or directory tree, which can
82         // then be spliced onto a different path or a different
83         // collection.
84         Snapshot() (*Subtree, error)
85         // Replace this file or directory with the given snapshot.
86         // The target must be inside a collection: Splice returns an
87         // error if the File is a virtual file or directory like
88         // by_id, a project directory, .arvados#collection,
89         // etc. Splice can replace directories with regular files and
90         // vice versa, except it cannot replace the root directory of
91         // a collection with a regular file.
92         Splice(snapshot *Subtree) error
93 }
94
95 // A Subtree is a detached part of a filesystem tree that can be
96 // spliced into a filesystem via (File)Splice().
97 type Subtree struct {
98         inode inode
99 }
100
101 // A FileSystem is an http.Filesystem plus Stat() and support for
102 // opening writable files. All methods are safe to call from multiple
103 // goroutines.
104 type FileSystem interface {
105         http.FileSystem
106         fsBackend
107
108         rootnode() inode
109
110         // filesystem-wide lock: used by Rename() to prevent deadlock
111         // while locking multiple inodes.
112         locker() sync.Locker
113
114         // throttle for limiting concurrent background writers
115         throttle() *throttle
116
117         // create a new node with nil parent.
118         newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
119
120         // analogous to os.Stat()
121         Stat(name string) (os.FileInfo, error)
122
123         // analogous to os.Create(): create/truncate a file and open it O_RDWR.
124         Create(name string) (File, error)
125
126         // Like os.OpenFile(): create or open a file or directory.
127         //
128         // If flag&os.O_EXCL==0, it opens an existing file or
129         // directory if one exists. If flag&os.O_CREATE!=0, it creates
130         // a new empty file or directory if one does not already
131         // exist.
132         //
133         // When creating a new item, perm&os.ModeDir determines
134         // whether it is a file or a directory.
135         //
136         // A file can be opened multiple times and used concurrently
137         // from multiple goroutines. However, each File object should
138         // be used by only one goroutine at a time.
139         OpenFile(name string, flag int, perm os.FileMode) (File, error)
140
141         Mkdir(name string, perm os.FileMode) error
142         Remove(name string) error
143         RemoveAll(name string) error
144         Rename(oldname, newname string) error
145
146         // Write buffered data from memory to storage, returning when
147         // all updates have been saved to persistent storage.
148         Sync() error
149
150         // Write buffered data from memory to storage, but don't wait
151         // for all writes to finish before returning. If shortBlocks
152         // is true, flush everything; otherwise, if there's less than
153         // a full block of buffered data at the end of a stream, leave
154         // it buffered in memory in case more data can be appended. If
155         // path is "", flush all dirs/streams; otherwise, flush only
156         // the specified dir/stream.
157         Flush(path string, shortBlocks bool) error
158
159         // Estimate current memory usage.
160         MemorySize() int64
161 }
162
163 type fsFS struct {
164         FileSystem
165 }
166
167 // FS returns an fs.FS interface to the given FileSystem, to enable
168 // the use of fs.WalkDir, etc.
169 func FS(fs FileSystem) fs.FS { return fsFS{fs} }
170 func (fs fsFS) Open(path string) (fs.File, error) {
171         f, err := fs.FileSystem.Open(path)
172         return f, err
173 }
174
175 type inode interface {
176         SetParent(parent inode, name string)
177         Parent() inode
178         FS() FileSystem
179         Read([]byte, filenodePtr) (int, filenodePtr, error)
180         Write([]byte, filenodePtr) (int, filenodePtr, error)
181         Truncate(int64) error
182         IsDir() bool
183         Readdir() ([]os.FileInfo, error)
184         Size() int64
185         FileInfo() os.FileInfo
186         // Create a snapshot of this node and its descendants.
187         Snapshot() (inode, error)
188         // Replace this node with a copy of the provided snapshot.
189         // Caller may provide the same snapshot to multiple Splice
190         // calls, but must not modify the snapshot concurrently.
191         Splice(inode) error
192
193         // Child() performs lookups and updates of named child nodes.
194         //
195         // (The term "child" here is used strictly. This means name is
196         // not "." or "..", and name does not contain "/".)
197         //
198         // If replace is non-nil, Child calls replace(x) where x is
199         // the current child inode with the given name. If possible,
200         // the child inode is replaced with the one returned by
201         // replace().
202         //
203         // If replace(x) returns an inode (besides x or nil) that is
204         // subsequently returned by Child(), then Child()'s caller
205         // must ensure the new child's name and parent are set/updated
206         // to Child()'s name argument and its receiver respectively.
207         // This is not necessarily done before replace(x) returns, but
208         // it must be done before Child()'s caller releases the
209         // parent's lock.
210         //
211         // Nil represents "no child". replace(nil) signifies that no
212         // child with this name exists yet. If replace() returns nil,
213         // the existing child should be deleted if possible.
214         //
215         // An implementation of Child() is permitted to ignore
216         // replace() or its return value. For example, a regular file
217         // inode does not have children, so Child() always returns
218         // nil.
219         //
220         // Child() returns the child, if any, with the given name: if
221         // a child was added or changed, the new child is returned.
222         //
223         // Caller must have lock (or rlock if replace is nil).
224         Child(name string, replace func(inode) (inode, error)) (inode, error)
225
226         sync.Locker
227         RLock()
228         RUnlock()
229         MemorySize() int64
230 }
231
232 type fileinfo struct {
233         name    string
234         mode    os.FileMode
235         size    int64
236         modTime time.Time
237         // If not nil, sys() returns the source data structure, which
238         // can be a *Collection, *Group, or nil. Currently populated
239         // only for project dirs and top-level collection dirs. Does
240         // not stay up to date with upstream changes.
241         //
242         // Intended to support keep-web's properties-as-s3-metadata
243         // feature (https://dev.arvados.org/issues/19088).
244         sys func() interface{}
245 }
246
247 // Name implements os.FileInfo.
248 func (fi fileinfo) Name() string {
249         return fi.name
250 }
251
252 // ModTime implements os.FileInfo.
253 func (fi fileinfo) ModTime() time.Time {
254         return fi.modTime
255 }
256
257 // Mode implements os.FileInfo.
258 func (fi fileinfo) Mode() os.FileMode {
259         return fi.mode
260 }
261
262 // IsDir implements os.FileInfo.
263 func (fi fileinfo) IsDir() bool {
264         return fi.mode&os.ModeDir != 0
265 }
266
267 // Size implements os.FileInfo.
268 func (fi fileinfo) Size() int64 {
269         return fi.size
270 }
271
272 // Sys implements os.FileInfo. See comment in fileinfo struct.
273 func (fi fileinfo) Sys() interface{} {
274         if fi.sys == nil {
275                 return nil
276         }
277         return fi.sys()
278 }
279
280 type nullnode struct{}
281
282 func (*nullnode) Mkdir(string, os.FileMode) error {
283         return ErrInvalidOperation
284 }
285
286 func (*nullnode) Read([]byte, filenodePtr) (int, filenodePtr, error) {
287         return 0, filenodePtr{}, ErrInvalidOperation
288 }
289
290 func (*nullnode) Write([]byte, filenodePtr) (int, filenodePtr, error) {
291         return 0, filenodePtr{}, ErrInvalidOperation
292 }
293
294 func (*nullnode) Truncate(int64) error {
295         return ErrInvalidOperation
296 }
297
298 func (*nullnode) FileInfo() os.FileInfo {
299         return fileinfo{}
300 }
301
302 func (*nullnode) IsDir() bool {
303         return false
304 }
305
306 func (*nullnode) Readdir() ([]os.FileInfo, error) {
307         return nil, ErrInvalidOperation
308 }
309
310 func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
311         return nil, ErrNotADirectory
312 }
313
314 func (*nullnode) MemorySize() int64 {
315         // Types that embed nullnode should report their own size, but
316         // if they don't, we at least report a non-zero size to ensure
317         // a large tree doesn't get reported as 0 bytes.
318         return 64
319 }
320
321 func (*nullnode) Snapshot() (inode, error) {
322         return nil, ErrInvalidOperation
323 }
324
325 func (*nullnode) Splice(inode) error {
326         return ErrInvalidOperation
327 }
328
329 type treenode struct {
330         fs       FileSystem
331         parent   inode
332         inodes   map[string]inode
333         fileinfo fileinfo
334         sync.RWMutex
335         nullnode
336 }
337
338 func (n *treenode) FS() FileSystem {
339         return n.fs
340 }
341
342 func (n *treenode) SetParent(p inode, name string) {
343         n.Lock()
344         defer n.Unlock()
345         n.parent = p
346         n.fileinfo.name = name
347 }
348
349 func (n *treenode) Parent() inode {
350         n.RLock()
351         defer n.RUnlock()
352         return n.parent
353 }
354
355 func (n *treenode) IsDir() bool {
356         return true
357 }
358
359 func (n *treenode) Child(name string, replace func(inode) (inode, error)) (child inode, err error) {
360         debugPanicIfNotLocked(n, false)
361         child = n.inodes[name]
362         if name == "" || name == "." || name == ".." {
363                 err = ErrInvalidArgument
364                 return
365         }
366         if replace == nil {
367                 return
368         }
369         newchild, err := replace(child)
370         if err != nil {
371                 return
372         }
373         if newchild == nil {
374                 debugPanicIfNotLocked(n, true)
375                 delete(n.inodes, name)
376         } else if newchild != child {
377                 debugPanicIfNotLocked(n, true)
378                 n.inodes[name] = newchild
379                 n.fileinfo.modTime = time.Now()
380                 child = newchild
381         }
382         return
383 }
384
385 func (n *treenode) Size() int64 {
386         return n.FileInfo().Size()
387 }
388
389 func (n *treenode) FileInfo() os.FileInfo {
390         n.Lock()
391         defer n.Unlock()
392         n.fileinfo.size = int64(len(n.inodes))
393         return n.fileinfo
394 }
395
396 func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
397         n.RLock()
398         defer n.RUnlock()
399         fi = make([]os.FileInfo, 0, len(n.inodes))
400         for _, inode := range n.inodes {
401                 fi = append(fi, inode.FileInfo())
402         }
403         return
404 }
405
406 func (n *treenode) Sync() error {
407         n.RLock()
408         defer n.RUnlock()
409         for _, inode := range n.inodes {
410                 syncer, ok := inode.(syncer)
411                 if !ok {
412                         return ErrInvalidOperation
413                 }
414                 err := syncer.Sync()
415                 if err != nil {
416                         return err
417                 }
418         }
419         return nil
420 }
421
422 func (n *treenode) MemorySize() (size int64) {
423         // To avoid making other callers wait while we count the
424         // entire filesystem size, we lock the node only long enough
425         // to copy the list of children. We accept that the resulting
426         // size will sometimes be misleading (e.g., we will
427         // double-count an item that moves from A to B after we check
428         // A's size but before we check B's size).
429         n.RLock()
430         debugPanicIfNotLocked(n, false)
431         todo := make([]inode, 0, len(n.inodes))
432         for _, inode := range n.inodes {
433                 todo = append(todo, inode)
434         }
435         n.RUnlock()
436         for _, inode := range todo {
437                 size += inode.MemorySize()
438         }
439         return 64 + size
440 }
441
442 type fileSystem struct {
443         root inode
444         fsBackend
445         mutex sync.Mutex
446         thr   *throttle
447 }
448
449 func (fs *fileSystem) rootnode() inode {
450         return fs.root
451 }
452
453 func (fs *fileSystem) throttle() *throttle {
454         return fs.thr
455 }
456
457 func (fs *fileSystem) locker() sync.Locker {
458         return &fs.mutex
459 }
460
461 // OpenFile is analogous to os.OpenFile().
462 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
463         return fs.openFile(name, flag, perm)
464 }
465
466 func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
467         if flag&os.O_SYNC != 0 {
468                 return nil, ErrSyncNotSupported
469         }
470         dirname, name := path.Split(name)
471         parent, err := rlookup(fs.root, dirname)
472         if err != nil {
473                 return nil, err
474         }
475         var readable, writable bool
476         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
477         case os.O_RDWR:
478                 readable = true
479                 writable = true
480         case os.O_RDONLY:
481                 readable = true
482         case os.O_WRONLY:
483                 writable = true
484         default:
485                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
486         }
487         if parent.IsDir() {
488                 // A directory can be opened via "foo/", "foo/.", or
489                 // "foo/..".
490                 switch name {
491                 case ".", "":
492                         return &filehandle{inode: parent, readable: readable, writable: writable}, nil
493                 case "..":
494                         return &filehandle{inode: parent.Parent(), readable: readable, writable: writable}, nil
495                 }
496         }
497         createMode := flag&os.O_CREATE != 0
498         // We always need to take Lock() here, not just RLock(). Even
499         // if we know we won't be creating a file, parent might be a
500         // lookupnode, which sometimes populates its inodes map during
501         // a Child() call.
502         parent.Lock()
503         defer parent.Unlock()
504         n, err := parent.Child(name, nil)
505         if err != nil {
506                 return nil, err
507         } else if n == nil {
508                 if !createMode {
509                         return nil, os.ErrNotExist
510                 }
511                 n, err = parent.Child(name, func(inode) (repl inode, err error) {
512                         repl, err = parent.FS().newNode(name, perm|0755, time.Now())
513                         if err != nil {
514                                 return
515                         }
516                         repl.SetParent(parent, name)
517                         return
518                 })
519                 if err != nil {
520                         return nil, err
521                 } else if n == nil {
522                         // Parent rejected new child, but returned no error
523                         return nil, ErrInvalidArgument
524                 }
525         } else if flag&os.O_EXCL != 0 {
526                 return nil, ErrFileExists
527         } else if flag&os.O_TRUNC != 0 {
528                 if !writable {
529                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
530                 } else if n.IsDir() {
531                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
532                 } else if err := n.Truncate(0); err != nil {
533                         return nil, err
534                 }
535         }
536         return &filehandle{
537                 inode:    n,
538                 append:   flag&os.O_APPEND != 0,
539                 readable: readable,
540                 writable: writable,
541         }, nil
542 }
543
544 func (fs *fileSystem) Open(name string) (http.File, error) {
545         return fs.OpenFile(name, os.O_RDONLY, 0)
546 }
547
548 func (fs *fileSystem) Create(name string) (File, error) {
549         return fs.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
550 }
551
552 func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
553         dirname, name := path.Split(name)
554         n, err := rlookup(fs.root, dirname)
555         if err != nil {
556                 return err
557         }
558         n.Lock()
559         defer n.Unlock()
560         if child, err := n.Child(name, nil); err != nil {
561                 return err
562         } else if child != nil {
563                 return os.ErrExist
564         }
565
566         _, err = n.Child(name, func(inode) (repl inode, err error) {
567                 repl, err = n.FS().newNode(name, perm|os.ModeDir, time.Now())
568                 if err != nil {
569                         return
570                 }
571                 repl.SetParent(n, name)
572                 return
573         })
574         return err
575 }
576
577 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
578         node, err := rlookup(fs.root, name)
579         if err != nil {
580                 return nil, err
581         }
582         return node.FileInfo(), nil
583 }
584
585 func (fs *fileSystem) Rename(oldname, newname string) error {
586         olddir, oldname := path.Split(oldname)
587         if oldname == "" || oldname == "." || oldname == ".." {
588                 return ErrInvalidArgument
589         }
590         olddirf, err := fs.openFile(olddir+".", os.O_RDONLY, 0)
591         if err != nil {
592                 return fmt.Errorf("%q: %s", olddir, err)
593         }
594         defer olddirf.Close()
595
596         newdir, newname := path.Split(newname)
597         if newname == "." || newname == ".." {
598                 return ErrInvalidArgument
599         } else if newname == "" {
600                 // Rename("a/b", "c/") means Rename("a/b", "c/b")
601                 newname = oldname
602         }
603         newdirf, err := fs.openFile(newdir+".", os.O_RDONLY, 0)
604         if err != nil {
605                 return fmt.Errorf("%q: %s", newdir, err)
606         }
607         defer newdirf.Close()
608
609         // TODO: If the nearest common ancestor ("nca") of olddirf and
610         // newdirf is on a different filesystem than fs, we should
611         // call nca.FS().Rename() instead of proceeding. Until then
612         // it's awkward for filesystems to implement their own Rename
613         // methods effectively: the only one that runs is the one on
614         // the root FileSystem exposed to the caller (webdav, fuse,
615         // etc).
616
617         // When acquiring locks on multiple inodes, avoid deadlock by
618         // locking the entire containing filesystem first.
619         cfs := olddirf.inode.FS()
620         cfs.locker().Lock()
621         defer cfs.locker().Unlock()
622
623         if cfs != newdirf.inode.FS() {
624                 // Moving inodes across filesystems is not (yet)
625                 // supported. Locking inodes from different
626                 // filesystems could deadlock, so we must error out
627                 // now.
628                 return ErrInvalidOperation
629         }
630
631         // To ensure we can test reliably whether we're about to move
632         // a directory into itself, lock all potential common
633         // ancestors of olddir and newdir.
634         needLock := []sync.Locker{}
635         for _, node := range []inode{olddirf.inode, newdirf.inode} {
636                 needLock = append(needLock, node)
637                 for node.Parent() != node && node.Parent().FS() == node.FS() {
638                         node = node.Parent()
639                         needLock = append(needLock, node)
640                 }
641         }
642         locked := map[sync.Locker]bool{}
643         for i := len(needLock) - 1; i >= 0; i-- {
644                 n := needLock[i]
645                 if fs, ok := n.(interface{ rootnode() inode }); ok {
646                         // Lock the fs's root dir directly, not
647                         // through the fs. Otherwise our "locked" map
648                         // would not reliably prevent double-locking
649                         // the fs's root dir.
650                         n = fs.rootnode()
651                 }
652                 if !locked[n] {
653                         n.Lock()
654                         defer n.Unlock()
655                         locked[n] = true
656                 }
657         }
658
659         _, err = olddirf.inode.Child(oldname, func(oldinode inode) (inode, error) {
660                 if oldinode == nil {
661                         return oldinode, os.ErrNotExist
662                 }
663                 if locked[oldinode] {
664                         // oldinode cannot become a descendant of itself.
665                         return oldinode, ErrInvalidArgument
666                 }
667                 if oldinode.FS() != cfs && newdirf.inode != olddirf.inode {
668                         // moving a mount point to a different parent
669                         // is not (yet) supported.
670                         return oldinode, ErrInvalidArgument
671                 }
672                 accepted, err := newdirf.inode.Child(newname, func(existing inode) (inode, error) {
673                         if existing != nil && existing.IsDir() {
674                                 return existing, ErrIsDirectory
675                         }
676                         return oldinode, nil
677                 })
678                 if err != nil {
679                         // Leave oldinode in olddir.
680                         return oldinode, err
681                 }
682                 accepted.SetParent(newdirf.inode, newname)
683                 return nil, nil
684         })
685         return err
686 }
687
688 func (fs *fileSystem) Remove(name string) error {
689         return fs.remove(strings.TrimRight(name, "/"), false)
690 }
691
692 func (fs *fileSystem) RemoveAll(name string) error {
693         err := fs.remove(strings.TrimRight(name, "/"), true)
694         if os.IsNotExist(err) {
695                 // "If the path does not exist, RemoveAll returns
696                 // nil." (see "os" pkg)
697                 err = nil
698         }
699         return err
700 }
701
702 func (fs *fileSystem) remove(name string, recursive bool) error {
703         dirname, name := path.Split(name)
704         if name == "" || name == "." || name == ".." {
705                 return ErrInvalidArgument
706         }
707         dir, err := rlookup(fs.root, dirname)
708         if err != nil {
709                 return err
710         }
711         dir.Lock()
712         defer dir.Unlock()
713         _, err = dir.Child(name, func(node inode) (inode, error) {
714                 if node == nil {
715                         return nil, os.ErrNotExist
716                 }
717                 if !recursive && node.IsDir() && node.Size() > 0 {
718                         return node, ErrDirectoryNotEmpty
719                 }
720                 return nil, nil
721         })
722         return err
723 }
724
725 func (fs *fileSystem) Sync() error {
726         if syncer, ok := fs.root.(syncer); ok {
727                 return syncer.Sync()
728         }
729         return ErrInvalidOperation
730 }
731
732 func (fs *fileSystem) Flush(string, bool) error {
733         log.Printf("TODO: flush fileSystem")
734         return ErrInvalidOperation
735 }
736
737 func (fs *fileSystem) MemorySize() int64 {
738         return fs.root.MemorySize()
739 }
740
741 // rlookup (recursive lookup) returns the inode for the file/directory
742 // with the given name (which may contain "/" separators). If no such
743 // file/directory exists, the returned node is nil.
744 func rlookup(start inode, path string) (node inode, err error) {
745         node = start
746         for _, name := range strings.Split(path, "/") {
747                 if node.IsDir() {
748                         if name == "." || name == "" {
749                                 continue
750                         }
751                         if name == ".." {
752                                 node = node.Parent()
753                                 continue
754                         }
755                 }
756                 node, err = func() (inode, error) {
757                         node.Lock()
758                         defer node.Unlock()
759                         return node.Child(name, nil)
760                 }()
761                 if node == nil || err != nil {
762                         break
763                 }
764         }
765         if node == nil && err == nil {
766                 err = os.ErrNotExist
767         }
768         return
769 }
770
771 func permittedName(name string) bool {
772         return name != "" && name != "." && name != ".." && !strings.Contains(name, "/")
773 }
774
775 // Snapshot returns a Subtree that's a copy of the given path. It
776 // returns an error if the path is not inside a collection.
777 func Snapshot(fs FileSystem, path string) (*Subtree, error) {
778         f, err := fs.OpenFile(path, os.O_RDONLY, 0)
779         if err != nil {
780                 return nil, err
781         }
782         defer f.Close()
783         return f.Snapshot()
784 }
785
786 // Splice inserts newsubtree at the indicated target path.
787 //
788 // Splice returns an error if target is not inside a collection.
789 //
790 // Splice returns an error if target is the root of a collection and
791 // newsubtree is a snapshot of a file.
792 func Splice(fs FileSystem, target string, newsubtree *Subtree) error {
793         f, err := fs.OpenFile(target, os.O_WRONLY, 0)
794         if os.IsNotExist(err) {
795                 f, err = fs.OpenFile(target, os.O_CREATE|os.O_WRONLY, 0700)
796         }
797         if err != nil {
798                 return fmt.Errorf("open %s: %w", target, err)
799         }
800         defer f.Close()
801         return f.Splice(newsubtree)
802 }