Merge branch '17351-arvput-keepclient-storage-support'
[arvados.git] / sdk / go / arvados / fs_base.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         "net/http"
13         "os"
14         "path"
15         "strings"
16         "sync"
17         "time"
18 )
19
20 var (
21         ErrReadOnlyFile      = errors.New("read-only file")
22         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
23         ErrFileExists        = errors.New("file exists")
24         ErrInvalidOperation  = errors.New("invalid operation")
25         ErrInvalidArgument   = errors.New("invalid argument")
26         ErrDirectoryNotEmpty = errors.New("directory not empty")
27         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
28         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
29         ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
30         ErrNotADirectory     = errors.New("not a directory")
31         ErrPermission        = os.ErrPermission
32 )
33
34 type syncer interface {
35         Sync() error
36 }
37
38 // A File is an *os.File-like interface for reading and writing files
39 // in a FileSystem.
40 type File interface {
41         io.Reader
42         io.Writer
43         io.Closer
44         io.Seeker
45         Size() int64
46         Readdir(int) ([]os.FileInfo, error)
47         Stat() (os.FileInfo, error)
48         Truncate(int64) error
49         Sync() error
50 }
51
52 // A FileSystem is an http.Filesystem plus Stat() and support for
53 // opening writable files. All methods are safe to call from multiple
54 // goroutines.
55 type FileSystem interface {
56         http.FileSystem
57         fsBackend
58
59         rootnode() inode
60
61         // filesystem-wide lock: used by Rename() to prevent deadlock
62         // while locking multiple inodes.
63         locker() sync.Locker
64
65         // throttle for limiting concurrent background writers
66         throttle() *throttle
67
68         // create a new node with nil parent.
69         newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
70
71         // analogous to os.Stat()
72         Stat(name string) (os.FileInfo, error)
73
74         // analogous to os.Create(): create/truncate a file and open it O_RDWR.
75         Create(name string) (File, error)
76
77         // Like os.OpenFile(): create or open a file or directory.
78         //
79         // If flag&os.O_EXCL==0, it opens an existing file or
80         // directory if one exists. If flag&os.O_CREATE!=0, it creates
81         // a new empty file or directory if one does not already
82         // exist.
83         //
84         // When creating a new item, perm&os.ModeDir determines
85         // whether it is a file or a directory.
86         //
87         // A file can be opened multiple times and used concurrently
88         // from multiple goroutines. However, each File object should
89         // be used by only one goroutine at a time.
90         OpenFile(name string, flag int, perm os.FileMode) (File, error)
91
92         Mkdir(name string, perm os.FileMode) error
93         Remove(name string) error
94         RemoveAll(name string) error
95         Rename(oldname, newname string) error
96
97         // Write buffered data from memory to storage, returning when
98         // all updates have been saved to persistent storage.
99         Sync() error
100
101         // Write buffered data from memory to storage, but don't wait
102         // for all writes to finish before returning. If shortBlocks
103         // is true, flush everything; otherwise, if there's less than
104         // a full block of buffered data at the end of a stream, leave
105         // it buffered in memory in case more data can be appended. If
106         // path is "", flush all dirs/streams; otherwise, flush only
107         // the specified dir/stream.
108         Flush(path string, shortBlocks bool) error
109
110         // Estimate current memory usage.
111         MemorySize() int64
112 }
113
114 type inode interface {
115         SetParent(parent inode, name string)
116         Parent() inode
117         FS() FileSystem
118         Read([]byte, filenodePtr) (int, filenodePtr, error)
119         Write([]byte, filenodePtr) (int, filenodePtr, error)
120         Truncate(int64) error
121         IsDir() bool
122         Readdir() ([]os.FileInfo, error)
123         Size() int64
124         FileInfo() os.FileInfo
125
126         // Child() performs lookups and updates of named child nodes.
127         //
128         // (The term "child" here is used strictly. This means name is
129         // not "." or "..", and name does not contain "/".)
130         //
131         // If replace is non-nil, Child calls replace(x) where x is
132         // the current child inode with the given name. If possible,
133         // the child inode is replaced with the one returned by
134         // replace().
135         //
136         // If replace(x) returns an inode (besides x or nil) that is
137         // subsequently returned by Child(), then Child()'s caller
138         // must ensure the new child's name and parent are set/updated
139         // to Child()'s name argument and its receiver respectively.
140         // This is not necessarily done before replace(x) returns, but
141         // it must be done before Child()'s caller releases the
142         // parent's lock.
143         //
144         // Nil represents "no child". replace(nil) signifies that no
145         // child with this name exists yet. If replace() returns nil,
146         // the existing child should be deleted if possible.
147         //
148         // An implementation of Child() is permitted to ignore
149         // replace() or its return value. For example, a regular file
150         // inode does not have children, so Child() always returns
151         // nil.
152         //
153         // Child() returns the child, if any, with the given name: if
154         // a child was added or changed, the new child is returned.
155         //
156         // Caller must have lock (or rlock if replace is nil).
157         Child(name string, replace func(inode) (inode, error)) (inode, error)
158
159         sync.Locker
160         RLock()
161         RUnlock()
162         MemorySize() int64
163 }
164
165 type fileinfo struct {
166         name    string
167         mode    os.FileMode
168         size    int64
169         modTime time.Time
170 }
171
172 // Name implements os.FileInfo.
173 func (fi fileinfo) Name() string {
174         return fi.name
175 }
176
177 // ModTime implements os.FileInfo.
178 func (fi fileinfo) ModTime() time.Time {
179         return fi.modTime
180 }
181
182 // Mode implements os.FileInfo.
183 func (fi fileinfo) Mode() os.FileMode {
184         return fi.mode
185 }
186
187 // IsDir implements os.FileInfo.
188 func (fi fileinfo) IsDir() bool {
189         return fi.mode&os.ModeDir != 0
190 }
191
192 // Size implements os.FileInfo.
193 func (fi fileinfo) Size() int64 {
194         return fi.size
195 }
196
197 // Sys implements os.FileInfo.
198 func (fi fileinfo) Sys() interface{} {
199         return nil
200 }
201
202 type nullnode struct{}
203
204 func (*nullnode) Mkdir(string, os.FileMode) error {
205         return ErrInvalidOperation
206 }
207
208 func (*nullnode) Read([]byte, filenodePtr) (int, filenodePtr, error) {
209         return 0, filenodePtr{}, ErrInvalidOperation
210 }
211
212 func (*nullnode) Write([]byte, filenodePtr) (int, filenodePtr, error) {
213         return 0, filenodePtr{}, ErrInvalidOperation
214 }
215
216 func (*nullnode) Truncate(int64) error {
217         return ErrInvalidOperation
218 }
219
220 func (*nullnode) FileInfo() os.FileInfo {
221         return fileinfo{}
222 }
223
224 func (*nullnode) IsDir() bool {
225         return false
226 }
227
228 func (*nullnode) Readdir() ([]os.FileInfo, error) {
229         return nil, ErrInvalidOperation
230 }
231
232 func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
233         return nil, ErrNotADirectory
234 }
235
236 func (*nullnode) MemorySize() int64 {
237         // Types that embed nullnode should report their own size, but
238         // if they don't, we at least report a non-zero size to ensure
239         // a large tree doesn't get reported as 0 bytes.
240         return 64
241 }
242
243 type treenode struct {
244         fs       FileSystem
245         parent   inode
246         inodes   map[string]inode
247         fileinfo fileinfo
248         sync.RWMutex
249         nullnode
250 }
251
252 func (n *treenode) FS() FileSystem {
253         return n.fs
254 }
255
256 func (n *treenode) SetParent(p inode, name string) {
257         n.Lock()
258         defer n.Unlock()
259         n.parent = p
260         n.fileinfo.name = name
261 }
262
263 func (n *treenode) Parent() inode {
264         n.RLock()
265         defer n.RUnlock()
266         return n.parent
267 }
268
269 func (n *treenode) IsDir() bool {
270         return true
271 }
272
273 func (n *treenode) Child(name string, replace func(inode) (inode, error)) (child inode, err error) {
274         child = n.inodes[name]
275         if name == "" || name == "." || name == ".." {
276                 err = ErrInvalidArgument
277                 return
278         }
279         if replace == nil {
280                 return
281         }
282         newchild, err := replace(child)
283         if err != nil {
284                 return
285         }
286         if newchild == nil {
287                 delete(n.inodes, name)
288         } else if newchild != child {
289                 n.inodes[name] = newchild
290                 n.fileinfo.modTime = time.Now()
291                 child = newchild
292         }
293         return
294 }
295
296 func (n *treenode) Size() int64 {
297         return n.FileInfo().Size()
298 }
299
300 func (n *treenode) FileInfo() os.FileInfo {
301         n.Lock()
302         defer n.Unlock()
303         n.fileinfo.size = int64(len(n.inodes))
304         return n.fileinfo
305 }
306
307 func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
308         n.RLock()
309         defer n.RUnlock()
310         fi = make([]os.FileInfo, 0, len(n.inodes))
311         for _, inode := range n.inodes {
312                 fi = append(fi, inode.FileInfo())
313         }
314         return
315 }
316
317 func (n *treenode) Sync() error {
318         n.RLock()
319         defer n.RUnlock()
320         for _, inode := range n.inodes {
321                 syncer, ok := inode.(syncer)
322                 if !ok {
323                         return ErrInvalidOperation
324                 }
325                 err := syncer.Sync()
326                 if err != nil {
327                         return err
328                 }
329         }
330         return nil
331 }
332
333 func (n *treenode) MemorySize() (size int64) {
334         n.RLock()
335         defer n.RUnlock()
336         for _, inode := range n.inodes {
337                 size += inode.MemorySize()
338         }
339         return
340 }
341
342 type fileSystem struct {
343         root inode
344         fsBackend
345         mutex sync.Mutex
346         thr   *throttle
347 }
348
349 func (fs *fileSystem) rootnode() inode {
350         return fs.root
351 }
352
353 func (fs *fileSystem) throttle() *throttle {
354         return fs.thr
355 }
356
357 func (fs *fileSystem) locker() sync.Locker {
358         return &fs.mutex
359 }
360
361 // OpenFile is analogous to os.OpenFile().
362 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
363         return fs.openFile(name, flag, perm)
364 }
365
366 func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
367         if flag&os.O_SYNC != 0 {
368                 return nil, ErrSyncNotSupported
369         }
370         dirname, name := path.Split(name)
371         parent, err := rlookup(fs.root, dirname)
372         if err != nil {
373                 return nil, err
374         }
375         var readable, writable bool
376         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
377         case os.O_RDWR:
378                 readable = true
379                 writable = true
380         case os.O_RDONLY:
381                 readable = true
382         case os.O_WRONLY:
383                 writable = true
384         default:
385                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
386         }
387         if !writable && parent.IsDir() {
388                 // A directory can be opened via "foo/", "foo/.", or
389                 // "foo/..".
390                 switch name {
391                 case ".", "":
392                         return &filehandle{inode: parent}, nil
393                 case "..":
394                         return &filehandle{inode: parent.Parent()}, nil
395                 }
396         }
397         createMode := flag&os.O_CREATE != 0
398         if createMode {
399                 parent.Lock()
400                 defer parent.Unlock()
401         } else {
402                 parent.RLock()
403                 defer parent.RUnlock()
404         }
405         n, err := parent.Child(name, nil)
406         if err != nil {
407                 return nil, err
408         } else if n == nil {
409                 if !createMode {
410                         return nil, os.ErrNotExist
411                 }
412                 n, err = parent.Child(name, func(inode) (repl inode, err error) {
413                         repl, err = parent.FS().newNode(name, perm|0755, time.Now())
414                         if err != nil {
415                                 return
416                         }
417                         repl.SetParent(parent, name)
418                         return
419                 })
420                 if err != nil {
421                         return nil, err
422                 } else if n == nil {
423                         // Parent rejected new child, but returned no error
424                         return nil, ErrInvalidArgument
425                 }
426         } else if flag&os.O_EXCL != 0 {
427                 return nil, ErrFileExists
428         } else if flag&os.O_TRUNC != 0 {
429                 if !writable {
430                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
431                 } else if n.IsDir() {
432                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
433                 } else if err := n.Truncate(0); err != nil {
434                         return nil, err
435                 }
436         }
437         return &filehandle{
438                 inode:    n,
439                 append:   flag&os.O_APPEND != 0,
440                 readable: readable,
441                 writable: writable,
442         }, nil
443 }
444
445 func (fs *fileSystem) Open(name string) (http.File, error) {
446         return fs.OpenFile(name, os.O_RDONLY, 0)
447 }
448
449 func (fs *fileSystem) Create(name string) (File, error) {
450         return fs.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
451 }
452
453 func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
454         dirname, name := path.Split(name)
455         n, err := rlookup(fs.root, dirname)
456         if err != nil {
457                 return err
458         }
459         n.Lock()
460         defer n.Unlock()
461         if child, err := n.Child(name, nil); err != nil {
462                 return err
463         } else if child != nil {
464                 return os.ErrExist
465         }
466
467         _, err = n.Child(name, func(inode) (repl inode, err error) {
468                 repl, err = n.FS().newNode(name, perm|os.ModeDir, time.Now())
469                 if err != nil {
470                         return
471                 }
472                 repl.SetParent(n, name)
473                 return
474         })
475         return err
476 }
477
478 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
479         node, err := rlookup(fs.root, name)
480         if err != nil {
481                 return nil, err
482         }
483         return node.FileInfo(), nil
484 }
485
486 func (fs *fileSystem) Rename(oldname, newname string) error {
487         olddir, oldname := path.Split(oldname)
488         if oldname == "" || oldname == "." || oldname == ".." {
489                 return ErrInvalidArgument
490         }
491         olddirf, err := fs.openFile(olddir+".", os.O_RDONLY, 0)
492         if err != nil {
493                 return fmt.Errorf("%q: %s", olddir, err)
494         }
495         defer olddirf.Close()
496
497         newdir, newname := path.Split(newname)
498         if newname == "." || newname == ".." {
499                 return ErrInvalidArgument
500         } else if newname == "" {
501                 // Rename("a/b", "c/") means Rename("a/b", "c/b")
502                 newname = oldname
503         }
504         newdirf, err := fs.openFile(newdir+".", os.O_RDONLY, 0)
505         if err != nil {
506                 return fmt.Errorf("%q: %s", newdir, err)
507         }
508         defer newdirf.Close()
509
510         // TODO: If the nearest common ancestor ("nca") of olddirf and
511         // newdirf is on a different filesystem than fs, we should
512         // call nca.FS().Rename() instead of proceeding. Until then
513         // it's awkward for filesystems to implement their own Rename
514         // methods effectively: the only one that runs is the one on
515         // the root FileSystem exposed to the caller (webdav, fuse,
516         // etc).
517
518         // When acquiring locks on multiple inodes, avoid deadlock by
519         // locking the entire containing filesystem first.
520         cfs := olddirf.inode.FS()
521         cfs.locker().Lock()
522         defer cfs.locker().Unlock()
523
524         if cfs != newdirf.inode.FS() {
525                 // Moving inodes across filesystems is not (yet)
526                 // supported. Locking inodes from different
527                 // filesystems could deadlock, so we must error out
528                 // now.
529                 return ErrInvalidArgument
530         }
531
532         // To ensure we can test reliably whether we're about to move
533         // a directory into itself, lock all potential common
534         // ancestors of olddir and newdir.
535         needLock := []sync.Locker{}
536         for _, node := range []inode{olddirf.inode, newdirf.inode} {
537                 needLock = append(needLock, node)
538                 for node.Parent() != node && node.Parent().FS() == node.FS() {
539                         node = node.Parent()
540                         needLock = append(needLock, node)
541                 }
542         }
543         locked := map[sync.Locker]bool{}
544         for i := len(needLock) - 1; i >= 0; i-- {
545                 if n := needLock[i]; !locked[n] {
546                         n.Lock()
547                         defer n.Unlock()
548                         locked[n] = true
549                 }
550         }
551
552         _, err = olddirf.inode.Child(oldname, func(oldinode inode) (inode, error) {
553                 if oldinode == nil {
554                         return oldinode, os.ErrNotExist
555                 }
556                 if locked[oldinode] {
557                         // oldinode cannot become a descendant of itself.
558                         return oldinode, ErrInvalidArgument
559                 }
560                 if oldinode.FS() != cfs && newdirf.inode != olddirf.inode {
561                         // moving a mount point to a different parent
562                         // is not (yet) supported.
563                         return oldinode, ErrInvalidArgument
564                 }
565                 accepted, err := newdirf.inode.Child(newname, func(existing inode) (inode, error) {
566                         if existing != nil && existing.IsDir() {
567                                 return existing, ErrIsDirectory
568                         }
569                         return oldinode, nil
570                 })
571                 if err != nil {
572                         // Leave oldinode in olddir.
573                         return oldinode, err
574                 }
575                 accepted.SetParent(newdirf.inode, newname)
576                 return nil, nil
577         })
578         return err
579 }
580
581 func (fs *fileSystem) Remove(name string) error {
582         return fs.remove(strings.TrimRight(name, "/"), false)
583 }
584
585 func (fs *fileSystem) RemoveAll(name string) error {
586         err := fs.remove(strings.TrimRight(name, "/"), true)
587         if os.IsNotExist(err) {
588                 // "If the path does not exist, RemoveAll returns
589                 // nil." (see "os" pkg)
590                 err = nil
591         }
592         return err
593 }
594
595 func (fs *fileSystem) remove(name string, recursive bool) error {
596         dirname, name := path.Split(name)
597         if name == "" || name == "." || name == ".." {
598                 return ErrInvalidArgument
599         }
600         dir, err := rlookup(fs.root, dirname)
601         if err != nil {
602                 return err
603         }
604         dir.Lock()
605         defer dir.Unlock()
606         _, err = dir.Child(name, func(node inode) (inode, error) {
607                 if node == nil {
608                         return nil, os.ErrNotExist
609                 }
610                 if !recursive && node.IsDir() && node.Size() > 0 {
611                         return node, ErrDirectoryNotEmpty
612                 }
613                 return nil, nil
614         })
615         return err
616 }
617
618 func (fs *fileSystem) Sync() error {
619         if syncer, ok := fs.root.(syncer); ok {
620                 return syncer.Sync()
621         }
622         return ErrInvalidOperation
623 }
624
625 func (fs *fileSystem) Flush(string, bool) error {
626         log.Printf("TODO: flush fileSystem")
627         return ErrInvalidOperation
628 }
629
630 func (fs *fileSystem) MemorySize() int64 {
631         return fs.root.MemorySize()
632 }
633
634 // rlookup (recursive lookup) returns the inode for the file/directory
635 // with the given name (which may contain "/" separators). If no such
636 // file/directory exists, the returned node is nil.
637 func rlookup(start inode, path string) (node inode, err error) {
638         node = start
639         for _, name := range strings.Split(path, "/") {
640                 if node.IsDir() {
641                         if name == "." || name == "" {
642                                 continue
643                         }
644                         if name == ".." {
645                                 node = node.Parent()
646                                 continue
647                         }
648                 }
649                 node, err = func() (inode, error) {
650                         node.RLock()
651                         defer node.RUnlock()
652                         return node.Child(name, nil)
653                 }()
654                 if node == nil || err != nil {
655                         break
656                 }
657         }
658         if node == nil && err == nil {
659                 err = os.ErrNotExist
660         }
661         return
662 }
663
664 func permittedName(name string) bool {
665         return name != "" && name != "." && name != ".." && !strings.Contains(name, "/")
666 }