Merge branch '18600-copy-by-ref'
[arvados.git] / sdk / go / arvados / fs_base.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         "net/http"
13         "os"
14         "path"
15         "strings"
16         "sync"
17         "time"
18 )
19
20 var (
21         ErrReadOnlyFile      = errors.New("read-only file")
22         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
23         ErrFileExists        = errors.New("file exists")
24         ErrInvalidOperation  = errors.New("invalid operation")
25         ErrInvalidArgument   = errors.New("invalid argument")
26         ErrDirectoryNotEmpty = errors.New("directory not empty")
27         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
28         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
29         ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
30         ErrNotADirectory     = errors.New("not a directory")
31         ErrPermission        = os.ErrPermission
32         DebugLocksPanicMode  = false
33 )
34
35 type syncer interface {
36         Sync() error
37 }
38
39 func debugPanicIfNotLocked(l sync.Locker, writing bool) {
40         if !DebugLocksPanicMode {
41                 return
42         }
43         race := false
44         if rl, ok := l.(interface {
45                 RLock()
46                 RUnlock()
47         }); ok && writing {
48                 go func() {
49                         // Fail if we can grab the read lock during an
50                         // operation that purportedly has write lock.
51                         rl.RLock()
52                         race = true
53                         rl.RUnlock()
54                 }()
55         } else {
56                 go func() {
57                         l.Lock()
58                         race = true
59                         l.Unlock()
60                 }()
61         }
62         time.Sleep(100)
63         if race {
64                 panic("bug: caller-must-have-lock func called, but nobody has lock")
65         }
66 }
67
68 // A File is an *os.File-like interface for reading and writing files
69 // in a FileSystem.
70 type File interface {
71         io.Reader
72         io.Writer
73         io.Closer
74         io.Seeker
75         Size() int64
76         Readdir(int) ([]os.FileInfo, error)
77         Stat() (os.FileInfo, error)
78         Truncate(int64) error
79         Sync() error
80         // Create a snapshot of a file or directory tree, which can
81         // then be spliced onto a different path or a different
82         // collection.
83         Snapshot() (*Subtree, error)
84         // Replace this file or directory with the given snapshot.
85         // The target must be inside a collection: Splice returns an
86         // error if the File is a virtual file or directory like
87         // by_id, a project directory, .arvados#collection,
88         // etc. Splice can replace directories with regular files and
89         // vice versa, except it cannot replace the root directory of
90         // a collection with a regular file.
91         Splice(snapshot *Subtree) error
92 }
93
94 // A Subtree is a detached part of a filesystem tree that can be
95 // spliced into a filesystem via (File)Splice().
96 type Subtree struct {
97         inode inode
98 }
99
100 // A FileSystem is an http.Filesystem plus Stat() and support for
101 // opening writable files. All methods are safe to call from multiple
102 // goroutines.
103 type FileSystem interface {
104         http.FileSystem
105         fsBackend
106
107         rootnode() inode
108
109         // filesystem-wide lock: used by Rename() to prevent deadlock
110         // while locking multiple inodes.
111         locker() sync.Locker
112
113         // throttle for limiting concurrent background writers
114         throttle() *throttle
115
116         // create a new node with nil parent.
117         newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
118
119         // analogous to os.Stat()
120         Stat(name string) (os.FileInfo, error)
121
122         // analogous to os.Create(): create/truncate a file and open it O_RDWR.
123         Create(name string) (File, error)
124
125         // Like os.OpenFile(): create or open a file or directory.
126         //
127         // If flag&os.O_EXCL==0, it opens an existing file or
128         // directory if one exists. If flag&os.O_CREATE!=0, it creates
129         // a new empty file or directory if one does not already
130         // exist.
131         //
132         // When creating a new item, perm&os.ModeDir determines
133         // whether it is a file or a directory.
134         //
135         // A file can be opened multiple times and used concurrently
136         // from multiple goroutines. However, each File object should
137         // be used by only one goroutine at a time.
138         OpenFile(name string, flag int, perm os.FileMode) (File, error)
139
140         Mkdir(name string, perm os.FileMode) error
141         Remove(name string) error
142         RemoveAll(name string) error
143         Rename(oldname, newname string) error
144
145         // Write buffered data from memory to storage, returning when
146         // all updates have been saved to persistent storage.
147         Sync() error
148
149         // Write buffered data from memory to storage, but don't wait
150         // for all writes to finish before returning. If shortBlocks
151         // is true, flush everything; otherwise, if there's less than
152         // a full block of buffered data at the end of a stream, leave
153         // it buffered in memory in case more data can be appended. If
154         // path is "", flush all dirs/streams; otherwise, flush only
155         // the specified dir/stream.
156         Flush(path string, shortBlocks bool) error
157
158         // Estimate current memory usage.
159         MemorySize() int64
160 }
161
162 type inode interface {
163         SetParent(parent inode, name string)
164         Parent() inode
165         FS() FileSystem
166         Read([]byte, filenodePtr) (int, filenodePtr, error)
167         Write([]byte, filenodePtr) (int, filenodePtr, error)
168         Truncate(int64) error
169         IsDir() bool
170         Readdir() ([]os.FileInfo, error)
171         Size() int64
172         FileInfo() os.FileInfo
173         // Create a snapshot of this node and its descendants.
174         Snapshot() (inode, error)
175         // Replace this node with a copy of the provided snapshot.
176         // Caller may provide the same snapshot to multiple Splice
177         // calls, but must not modify the snapshot concurrently.
178         Splice(inode) error
179
180         // Child() performs lookups and updates of named child nodes.
181         //
182         // (The term "child" here is used strictly. This means name is
183         // not "." or "..", and name does not contain "/".)
184         //
185         // If replace is non-nil, Child calls replace(x) where x is
186         // the current child inode with the given name. If possible,
187         // the child inode is replaced with the one returned by
188         // replace().
189         //
190         // If replace(x) returns an inode (besides x or nil) that is
191         // subsequently returned by Child(), then Child()'s caller
192         // must ensure the new child's name and parent are set/updated
193         // to Child()'s name argument and its receiver respectively.
194         // This is not necessarily done before replace(x) returns, but
195         // it must be done before Child()'s caller releases the
196         // parent's lock.
197         //
198         // Nil represents "no child". replace(nil) signifies that no
199         // child with this name exists yet. If replace() returns nil,
200         // the existing child should be deleted if possible.
201         //
202         // An implementation of Child() is permitted to ignore
203         // replace() or its return value. For example, a regular file
204         // inode does not have children, so Child() always returns
205         // nil.
206         //
207         // Child() returns the child, if any, with the given name: if
208         // a child was added or changed, the new child is returned.
209         //
210         // Caller must have lock (or rlock if replace is nil).
211         Child(name string, replace func(inode) (inode, error)) (inode, error)
212
213         sync.Locker
214         RLock()
215         RUnlock()
216         MemorySize() int64
217 }
218
219 type fileinfo struct {
220         name    string
221         mode    os.FileMode
222         size    int64
223         modTime time.Time
224 }
225
226 // Name implements os.FileInfo.
227 func (fi fileinfo) Name() string {
228         return fi.name
229 }
230
231 // ModTime implements os.FileInfo.
232 func (fi fileinfo) ModTime() time.Time {
233         return fi.modTime
234 }
235
236 // Mode implements os.FileInfo.
237 func (fi fileinfo) Mode() os.FileMode {
238         return fi.mode
239 }
240
241 // IsDir implements os.FileInfo.
242 func (fi fileinfo) IsDir() bool {
243         return fi.mode&os.ModeDir != 0
244 }
245
246 // Size implements os.FileInfo.
247 func (fi fileinfo) Size() int64 {
248         return fi.size
249 }
250
251 // Sys implements os.FileInfo.
252 func (fi fileinfo) Sys() interface{} {
253         return nil
254 }
255
256 type nullnode struct{}
257
258 func (*nullnode) Mkdir(string, os.FileMode) error {
259         return ErrInvalidOperation
260 }
261
262 func (*nullnode) Read([]byte, filenodePtr) (int, filenodePtr, error) {
263         return 0, filenodePtr{}, ErrInvalidOperation
264 }
265
266 func (*nullnode) Write([]byte, filenodePtr) (int, filenodePtr, error) {
267         return 0, filenodePtr{}, ErrInvalidOperation
268 }
269
270 func (*nullnode) Truncate(int64) error {
271         return ErrInvalidOperation
272 }
273
274 func (*nullnode) FileInfo() os.FileInfo {
275         return fileinfo{}
276 }
277
278 func (*nullnode) IsDir() bool {
279         return false
280 }
281
282 func (*nullnode) Readdir() ([]os.FileInfo, error) {
283         return nil, ErrInvalidOperation
284 }
285
286 func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
287         return nil, ErrNotADirectory
288 }
289
290 func (*nullnode) MemorySize() int64 {
291         // Types that embed nullnode should report their own size, but
292         // if they don't, we at least report a non-zero size to ensure
293         // a large tree doesn't get reported as 0 bytes.
294         return 64
295 }
296
297 func (*nullnode) Snapshot() (inode, error) {
298         return nil, ErrInvalidOperation
299 }
300
301 func (*nullnode) Splice(inode) error {
302         return ErrInvalidOperation
303 }
304
305 type treenode struct {
306         fs       FileSystem
307         parent   inode
308         inodes   map[string]inode
309         fileinfo fileinfo
310         sync.RWMutex
311         nullnode
312 }
313
314 func (n *treenode) FS() FileSystem {
315         return n.fs
316 }
317
318 func (n *treenode) SetParent(p inode, name string) {
319         n.Lock()
320         defer n.Unlock()
321         n.parent = p
322         n.fileinfo.name = name
323 }
324
325 func (n *treenode) Parent() inode {
326         n.RLock()
327         defer n.RUnlock()
328         return n.parent
329 }
330
331 func (n *treenode) IsDir() bool {
332         return true
333 }
334
335 func (n *treenode) Child(name string, replace func(inode) (inode, error)) (child inode, err error) {
336         debugPanicIfNotLocked(n, false)
337         child = n.inodes[name]
338         if name == "" || name == "." || name == ".." {
339                 err = ErrInvalidArgument
340                 return
341         }
342         if replace == nil {
343                 return
344         }
345         newchild, err := replace(child)
346         if err != nil {
347                 return
348         }
349         if newchild == nil {
350                 debugPanicIfNotLocked(n, true)
351                 delete(n.inodes, name)
352         } else if newchild != child {
353                 debugPanicIfNotLocked(n, true)
354                 n.inodes[name] = newchild
355                 n.fileinfo.modTime = time.Now()
356                 child = newchild
357         }
358         return
359 }
360
361 func (n *treenode) Size() int64 {
362         return n.FileInfo().Size()
363 }
364
365 func (n *treenode) FileInfo() os.FileInfo {
366         n.Lock()
367         defer n.Unlock()
368         n.fileinfo.size = int64(len(n.inodes))
369         return n.fileinfo
370 }
371
372 func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
373         n.RLock()
374         defer n.RUnlock()
375         fi = make([]os.FileInfo, 0, len(n.inodes))
376         for _, inode := range n.inodes {
377                 fi = append(fi, inode.FileInfo())
378         }
379         return
380 }
381
382 func (n *treenode) Sync() error {
383         n.RLock()
384         defer n.RUnlock()
385         for _, inode := range n.inodes {
386                 syncer, ok := inode.(syncer)
387                 if !ok {
388                         return ErrInvalidOperation
389                 }
390                 err := syncer.Sync()
391                 if err != nil {
392                         return err
393                 }
394         }
395         return nil
396 }
397
398 func (n *treenode) MemorySize() (size int64) {
399         n.RLock()
400         defer n.RUnlock()
401         debugPanicIfNotLocked(n, false)
402         for _, inode := range n.inodes {
403                 size += inode.MemorySize()
404         }
405         return
406 }
407
408 type fileSystem struct {
409         root inode
410         fsBackend
411         mutex sync.Mutex
412         thr   *throttle
413 }
414
415 func (fs *fileSystem) rootnode() inode {
416         return fs.root
417 }
418
419 func (fs *fileSystem) throttle() *throttle {
420         return fs.thr
421 }
422
423 func (fs *fileSystem) locker() sync.Locker {
424         return &fs.mutex
425 }
426
427 // OpenFile is analogous to os.OpenFile().
428 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
429         return fs.openFile(name, flag, perm)
430 }
431
432 func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
433         if flag&os.O_SYNC != 0 {
434                 return nil, ErrSyncNotSupported
435         }
436         dirname, name := path.Split(name)
437         parent, err := rlookup(fs.root, dirname)
438         if err != nil {
439                 return nil, err
440         }
441         var readable, writable bool
442         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
443         case os.O_RDWR:
444                 readable = true
445                 writable = true
446         case os.O_RDONLY:
447                 readable = true
448         case os.O_WRONLY:
449                 writable = true
450         default:
451                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
452         }
453         if !writable && parent.IsDir() {
454                 // A directory can be opened via "foo/", "foo/.", or
455                 // "foo/..".
456                 switch name {
457                 case ".", "":
458                         return &filehandle{inode: parent}, nil
459                 case "..":
460                         return &filehandle{inode: parent.Parent()}, nil
461                 }
462         }
463         createMode := flag&os.O_CREATE != 0
464         // We always need to take Lock() here, not just RLock(). Even
465         // if we know we won't be creating a file, parent might be a
466         // lookupnode, which sometimes populates its inodes map during
467         // a Child() call.
468         parent.Lock()
469         defer parent.Unlock()
470         n, err := parent.Child(name, nil)
471         if err != nil {
472                 return nil, err
473         } else if n == nil {
474                 if !createMode {
475                         return nil, os.ErrNotExist
476                 }
477                 n, err = parent.Child(name, func(inode) (repl inode, err error) {
478                         repl, err = parent.FS().newNode(name, perm|0755, time.Now())
479                         if err != nil {
480                                 return
481                         }
482                         repl.SetParent(parent, name)
483                         return
484                 })
485                 if err != nil {
486                         return nil, err
487                 } else if n == nil {
488                         // Parent rejected new child, but returned no error
489                         return nil, ErrInvalidArgument
490                 }
491         } else if flag&os.O_EXCL != 0 {
492                 return nil, ErrFileExists
493         } else if flag&os.O_TRUNC != 0 {
494                 if !writable {
495                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
496                 } else if n.IsDir() {
497                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
498                 } else if err := n.Truncate(0); err != nil {
499                         return nil, err
500                 }
501         }
502         return &filehandle{
503                 inode:    n,
504                 append:   flag&os.O_APPEND != 0,
505                 readable: readable,
506                 writable: writable,
507         }, nil
508 }
509
510 func (fs *fileSystem) Open(name string) (http.File, error) {
511         return fs.OpenFile(name, os.O_RDONLY, 0)
512 }
513
514 func (fs *fileSystem) Create(name string) (File, error) {
515         return fs.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
516 }
517
518 func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
519         dirname, name := path.Split(name)
520         n, err := rlookup(fs.root, dirname)
521         if err != nil {
522                 return err
523         }
524         n.Lock()
525         defer n.Unlock()
526         if child, err := n.Child(name, nil); err != nil {
527                 return err
528         } else if child != nil {
529                 return os.ErrExist
530         }
531
532         _, err = n.Child(name, func(inode) (repl inode, err error) {
533                 repl, err = n.FS().newNode(name, perm|os.ModeDir, time.Now())
534                 if err != nil {
535                         return
536                 }
537                 repl.SetParent(n, name)
538                 return
539         })
540         return err
541 }
542
543 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
544         node, err := rlookup(fs.root, name)
545         if err != nil {
546                 return nil, err
547         }
548         return node.FileInfo(), nil
549 }
550
551 func (fs *fileSystem) Rename(oldname, newname string) error {
552         olddir, oldname := path.Split(oldname)
553         if oldname == "" || oldname == "." || oldname == ".." {
554                 return ErrInvalidArgument
555         }
556         olddirf, err := fs.openFile(olddir+".", os.O_RDONLY, 0)
557         if err != nil {
558                 return fmt.Errorf("%q: %s", olddir, err)
559         }
560         defer olddirf.Close()
561
562         newdir, newname := path.Split(newname)
563         if newname == "." || newname == ".." {
564                 return ErrInvalidArgument
565         } else if newname == "" {
566                 // Rename("a/b", "c/") means Rename("a/b", "c/b")
567                 newname = oldname
568         }
569         newdirf, err := fs.openFile(newdir+".", os.O_RDONLY, 0)
570         if err != nil {
571                 return fmt.Errorf("%q: %s", newdir, err)
572         }
573         defer newdirf.Close()
574
575         // TODO: If the nearest common ancestor ("nca") of olddirf and
576         // newdirf is on a different filesystem than fs, we should
577         // call nca.FS().Rename() instead of proceeding. Until then
578         // it's awkward for filesystems to implement their own Rename
579         // methods effectively: the only one that runs is the one on
580         // the root FileSystem exposed to the caller (webdav, fuse,
581         // etc).
582
583         // When acquiring locks on multiple inodes, avoid deadlock by
584         // locking the entire containing filesystem first.
585         cfs := olddirf.inode.FS()
586         cfs.locker().Lock()
587         defer cfs.locker().Unlock()
588
589         if cfs != newdirf.inode.FS() {
590                 // Moving inodes across filesystems is not (yet)
591                 // supported. Locking inodes from different
592                 // filesystems could deadlock, so we must error out
593                 // now.
594                 return ErrInvalidOperation
595         }
596
597         // To ensure we can test reliably whether we're about to move
598         // a directory into itself, lock all potential common
599         // ancestors of olddir and newdir.
600         needLock := []sync.Locker{}
601         for _, node := range []inode{olddirf.inode, newdirf.inode} {
602                 needLock = append(needLock, node)
603                 for node.Parent() != node && node.Parent().FS() == node.FS() {
604                         node = node.Parent()
605                         needLock = append(needLock, node)
606                 }
607         }
608         locked := map[sync.Locker]bool{}
609         for i := len(needLock) - 1; i >= 0; i-- {
610                 if n := needLock[i]; !locked[n] {
611                         n.Lock()
612                         defer n.Unlock()
613                         locked[n] = true
614                 }
615         }
616
617         _, err = olddirf.inode.Child(oldname, func(oldinode inode) (inode, error) {
618                 if oldinode == nil {
619                         return oldinode, os.ErrNotExist
620                 }
621                 if locked[oldinode] {
622                         // oldinode cannot become a descendant of itself.
623                         return oldinode, ErrInvalidArgument
624                 }
625                 if oldinode.FS() != cfs && newdirf.inode != olddirf.inode {
626                         // moving a mount point to a different parent
627                         // is not (yet) supported.
628                         return oldinode, ErrInvalidArgument
629                 }
630                 accepted, err := newdirf.inode.Child(newname, func(existing inode) (inode, error) {
631                         if existing != nil && existing.IsDir() {
632                                 return existing, ErrIsDirectory
633                         }
634                         return oldinode, nil
635                 })
636                 if err != nil {
637                         // Leave oldinode in olddir.
638                         return oldinode, err
639                 }
640                 accepted.SetParent(newdirf.inode, newname)
641                 return nil, nil
642         })
643         return err
644 }
645
646 func (fs *fileSystem) Remove(name string) error {
647         return fs.remove(strings.TrimRight(name, "/"), false)
648 }
649
650 func (fs *fileSystem) RemoveAll(name string) error {
651         err := fs.remove(strings.TrimRight(name, "/"), true)
652         if os.IsNotExist(err) {
653                 // "If the path does not exist, RemoveAll returns
654                 // nil." (see "os" pkg)
655                 err = nil
656         }
657         return err
658 }
659
660 func (fs *fileSystem) remove(name string, recursive bool) error {
661         dirname, name := path.Split(name)
662         if name == "" || name == "." || name == ".." {
663                 return ErrInvalidArgument
664         }
665         dir, err := rlookup(fs.root, dirname)
666         if err != nil {
667                 return err
668         }
669         dir.Lock()
670         defer dir.Unlock()
671         _, err = dir.Child(name, func(node inode) (inode, error) {
672                 if node == nil {
673                         return nil, os.ErrNotExist
674                 }
675                 if !recursive && node.IsDir() && node.Size() > 0 {
676                         return node, ErrDirectoryNotEmpty
677                 }
678                 return nil, nil
679         })
680         return err
681 }
682
683 func (fs *fileSystem) Sync() error {
684         if syncer, ok := fs.root.(syncer); ok {
685                 return syncer.Sync()
686         }
687         return ErrInvalidOperation
688 }
689
690 func (fs *fileSystem) Flush(string, bool) error {
691         log.Printf("TODO: flush fileSystem")
692         return ErrInvalidOperation
693 }
694
695 func (fs *fileSystem) MemorySize() int64 {
696         return fs.root.MemorySize()
697 }
698
699 // rlookup (recursive lookup) returns the inode for the file/directory
700 // with the given name (which may contain "/" separators). If no such
701 // file/directory exists, the returned node is nil.
702 func rlookup(start inode, path string) (node inode, err error) {
703         node = start
704         for _, name := range strings.Split(path, "/") {
705                 if node.IsDir() {
706                         if name == "." || name == "" {
707                                 continue
708                         }
709                         if name == ".." {
710                                 node = node.Parent()
711                                 continue
712                         }
713                 }
714                 node, err = func() (inode, error) {
715                         node.Lock()
716                         defer node.Unlock()
717                         return node.Child(name, nil)
718                 }()
719                 if node == nil || err != nil {
720                         break
721                 }
722         }
723         if node == nil && err == nil {
724                 err = os.ErrNotExist
725         }
726         return
727 }
728
729 func permittedName(name string) bool {
730         return name != "" && name != "." && name != ".." && !strings.Contains(name, "/")
731 }
732
733 // Snapshot returns a Subtree that's a copy of the given path. It
734 // returns an error if the path is not inside a collection.
735 func Snapshot(fs FileSystem, path string) (*Subtree, error) {
736         f, err := fs.OpenFile(path, os.O_RDONLY, 0)
737         if err != nil {
738                 return nil, err
739         }
740         defer f.Close()
741         return f.Snapshot()
742 }
743
744 // Splice inserts newsubtree at the indicated target path.
745 //
746 // Splice returns an error if target is not inside a collection.
747 //
748 // Splice returns an error if target is the root of a collection and
749 // newsubtree is a snapshot of a file.
750 func Splice(fs FileSystem, target string, newsubtree *Subtree) error {
751         f, err := fs.OpenFile(target, os.O_WRONLY, 0)
752         if os.IsNotExist(err) {
753                 f, err = fs.OpenFile(target, os.O_CREATE|os.O_WRONLY, 0700)
754         }
755         if err != nil {
756                 return err
757         }
758         defer f.Close()
759         return f.Splice(newsubtree)
760 }