15652: Add collectionfs Flush() method to control memory use.
[arvados.git] / sdk / go / arvados / fs_base.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         "net/http"
13         "os"
14         "path"
15         "strings"
16         "sync"
17         "time"
18 )
19
20 var (
21         ErrReadOnlyFile      = errors.New("read-only file")
22         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
23         ErrFileExists        = errors.New("file exists")
24         ErrInvalidOperation  = errors.New("invalid operation")
25         ErrInvalidArgument   = errors.New("invalid argument")
26         ErrDirectoryNotEmpty = errors.New("directory not empty")
27         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
28         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
29         ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
30         ErrNotADirectory     = errors.New("not a directory")
31         ErrPermission        = os.ErrPermission
32 )
33
34 // A File is an *os.File-like interface for reading and writing files
35 // in a FileSystem.
36 type File interface {
37         io.Reader
38         io.Writer
39         io.Closer
40         io.Seeker
41         Size() int64
42         Readdir(int) ([]os.FileInfo, error)
43         Stat() (os.FileInfo, error)
44         Truncate(int64) error
45         Sync() error
46 }
47
48 // A FileSystem is an http.Filesystem plus Stat() and support for
49 // opening writable files. All methods are safe to call from multiple
50 // goroutines.
51 type FileSystem interface {
52         http.FileSystem
53         fsBackend
54
55         rootnode() inode
56
57         // filesystem-wide lock: used by Rename() to prevent deadlock
58         // while locking multiple inodes.
59         locker() sync.Locker
60
61         // create a new node with nil parent.
62         newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
63
64         // analogous to os.Stat()
65         Stat(name string) (os.FileInfo, error)
66
67         // analogous to os.Create(): create/truncate a file and open it O_RDWR.
68         Create(name string) (File, error)
69
70         // Like os.OpenFile(): create or open a file or directory.
71         //
72         // If flag&os.O_EXCL==0, it opens an existing file or
73         // directory if one exists. If flag&os.O_CREATE!=0, it creates
74         // a new empty file or directory if one does not already
75         // exist.
76         //
77         // When creating a new item, perm&os.ModeDir determines
78         // whether it is a file or a directory.
79         //
80         // A file can be opened multiple times and used concurrently
81         // from multiple goroutines. However, each File object should
82         // be used by only one goroutine at a time.
83         OpenFile(name string, flag int, perm os.FileMode) (File, error)
84
85         Mkdir(name string, perm os.FileMode) error
86         Remove(name string) error
87         RemoveAll(name string) error
88         Rename(oldname, newname string) error
89
90         // Write buffered data from memory to storage, returning when
91         // all updates have been saved to persistent storage.
92         Sync() error
93
94         // Write buffered data from memory to storage, but don't wait
95         // for all writes to finish before returning. If shortBlocks
96         // is true, flush everything; otherwise, if there's less than
97         // a full block of buffered data at the end of a stream, leave
98         // it buffered in memory in case more data can be appended.
99         Flush(shortBlocks bool) error
100 }
101
102 type inode interface {
103         SetParent(parent inode, name string)
104         Parent() inode
105         FS() FileSystem
106         Read([]byte, filenodePtr) (int, filenodePtr, error)
107         Write([]byte, filenodePtr) (int, filenodePtr, error)
108         Truncate(int64) error
109         IsDir() bool
110         Readdir() ([]os.FileInfo, error)
111         Size() int64
112         FileInfo() os.FileInfo
113
114         // Child() performs lookups and updates of named child nodes.
115         //
116         // (The term "child" here is used strictly. This means name is
117         // not "." or "..", and name does not contain "/".)
118         //
119         // If replace is non-nil, Child calls replace(x) where x is
120         // the current child inode with the given name. If possible,
121         // the child inode is replaced with the one returned by
122         // replace().
123         //
124         // If replace(x) returns an inode (besides x or nil) that is
125         // subsequently returned by Child(), then Child()'s caller
126         // must ensure the new child's name and parent are set/updated
127         // to Child()'s name argument and its receiver respectively.
128         // This is not necessarily done before replace(x) returns, but
129         // it must be done before Child()'s caller releases the
130         // parent's lock.
131         //
132         // Nil represents "no child". replace(nil) signifies that no
133         // child with this name exists yet. If replace() returns nil,
134         // the existing child should be deleted if possible.
135         //
136         // An implementation of Child() is permitted to ignore
137         // replace() or its return value. For example, a regular file
138         // inode does not have children, so Child() always returns
139         // nil.
140         //
141         // Child() returns the child, if any, with the given name: if
142         // a child was added or changed, the new child is returned.
143         //
144         // Caller must have lock (or rlock if replace is nil).
145         Child(name string, replace func(inode) (inode, error)) (inode, error)
146
147         sync.Locker
148         RLock()
149         RUnlock()
150 }
151
152 type fileinfo struct {
153         name    string
154         mode    os.FileMode
155         size    int64
156         modTime time.Time
157 }
158
159 // Name implements os.FileInfo.
160 func (fi fileinfo) Name() string {
161         return fi.name
162 }
163
164 // ModTime implements os.FileInfo.
165 func (fi fileinfo) ModTime() time.Time {
166         return fi.modTime
167 }
168
169 // Mode implements os.FileInfo.
170 func (fi fileinfo) Mode() os.FileMode {
171         return fi.mode
172 }
173
174 // IsDir implements os.FileInfo.
175 func (fi fileinfo) IsDir() bool {
176         return fi.mode&os.ModeDir != 0
177 }
178
179 // Size implements os.FileInfo.
180 func (fi fileinfo) Size() int64 {
181         return fi.size
182 }
183
184 // Sys implements os.FileInfo.
185 func (fi fileinfo) Sys() interface{} {
186         return nil
187 }
188
189 type nullnode struct{}
190
191 func (*nullnode) Mkdir(string, os.FileMode) error {
192         return ErrInvalidOperation
193 }
194
195 func (*nullnode) Read([]byte, filenodePtr) (int, filenodePtr, error) {
196         return 0, filenodePtr{}, ErrInvalidOperation
197 }
198
199 func (*nullnode) Write([]byte, filenodePtr) (int, filenodePtr, error) {
200         return 0, filenodePtr{}, ErrInvalidOperation
201 }
202
203 func (*nullnode) Truncate(int64) error {
204         return ErrInvalidOperation
205 }
206
207 func (*nullnode) FileInfo() os.FileInfo {
208         return fileinfo{}
209 }
210
211 func (*nullnode) IsDir() bool {
212         return false
213 }
214
215 func (*nullnode) Readdir() ([]os.FileInfo, error) {
216         return nil, ErrInvalidOperation
217 }
218
219 func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
220         return nil, ErrNotADirectory
221 }
222
223 type treenode struct {
224         fs       FileSystem
225         parent   inode
226         inodes   map[string]inode
227         fileinfo fileinfo
228         sync.RWMutex
229         nullnode
230 }
231
232 func (n *treenode) FS() FileSystem {
233         return n.fs
234 }
235
236 func (n *treenode) SetParent(p inode, name string) {
237         n.Lock()
238         defer n.Unlock()
239         n.parent = p
240         n.fileinfo.name = name
241 }
242
243 func (n *treenode) Parent() inode {
244         n.RLock()
245         defer n.RUnlock()
246         return n.parent
247 }
248
249 func (n *treenode) IsDir() bool {
250         return true
251 }
252
253 func (n *treenode) Child(name string, replace func(inode) (inode, error)) (child inode, err error) {
254         child = n.inodes[name]
255         if name == "" || name == "." || name == ".." {
256                 err = ErrInvalidArgument
257                 return
258         }
259         if replace == nil {
260                 return
261         }
262         newchild, err := replace(child)
263         if err != nil {
264                 return
265         }
266         if newchild == nil {
267                 delete(n.inodes, name)
268         } else if newchild != child {
269                 n.inodes[name] = newchild
270                 n.fileinfo.modTime = time.Now()
271                 child = newchild
272         }
273         return
274 }
275
276 func (n *treenode) Size() int64 {
277         return n.FileInfo().Size()
278 }
279
280 func (n *treenode) FileInfo() os.FileInfo {
281         n.Lock()
282         defer n.Unlock()
283         n.fileinfo.size = int64(len(n.inodes))
284         return n.fileinfo
285 }
286
287 func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
288         n.RLock()
289         defer n.RUnlock()
290         fi = make([]os.FileInfo, 0, len(n.inodes))
291         for _, inode := range n.inodes {
292                 fi = append(fi, inode.FileInfo())
293         }
294         return
295 }
296
297 type fileSystem struct {
298         root inode
299         fsBackend
300         mutex sync.Mutex
301 }
302
303 func (fs *fileSystem) rootnode() inode {
304         return fs.root
305 }
306
307 func (fs *fileSystem) locker() sync.Locker {
308         return &fs.mutex
309 }
310
311 // OpenFile is analogous to os.OpenFile().
312 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
313         return fs.openFile(name, flag, perm)
314 }
315
316 func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
317         if flag&os.O_SYNC != 0 {
318                 return nil, ErrSyncNotSupported
319         }
320         dirname, name := path.Split(name)
321         parent, err := rlookup(fs.root, dirname)
322         if err != nil {
323                 return nil, err
324         }
325         var readable, writable bool
326         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
327         case os.O_RDWR:
328                 readable = true
329                 writable = true
330         case os.O_RDONLY:
331                 readable = true
332         case os.O_WRONLY:
333                 writable = true
334         default:
335                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
336         }
337         if !writable && parent.IsDir() {
338                 // A directory can be opened via "foo/", "foo/.", or
339                 // "foo/..".
340                 switch name {
341                 case ".", "":
342                         return &filehandle{inode: parent}, nil
343                 case "..":
344                         return &filehandle{inode: parent.Parent()}, nil
345                 }
346         }
347         createMode := flag&os.O_CREATE != 0
348         if createMode {
349                 parent.Lock()
350                 defer parent.Unlock()
351         } else {
352                 parent.RLock()
353                 defer parent.RUnlock()
354         }
355         n, err := parent.Child(name, nil)
356         if err != nil {
357                 return nil, err
358         } else if n == nil {
359                 if !createMode {
360                         return nil, os.ErrNotExist
361                 }
362                 n, err = parent.Child(name, func(inode) (repl inode, err error) {
363                         repl, err = parent.FS().newNode(name, perm|0755, time.Now())
364                         if err != nil {
365                                 return
366                         }
367                         repl.SetParent(parent, name)
368                         return
369                 })
370                 if err != nil {
371                         return nil, err
372                 } else if n == nil {
373                         // Parent rejected new child, but returned no error
374                         return nil, ErrInvalidArgument
375                 }
376         } else if flag&os.O_EXCL != 0 {
377                 return nil, ErrFileExists
378         } else if flag&os.O_TRUNC != 0 {
379                 if !writable {
380                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
381                 } else if n.IsDir() {
382                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
383                 } else if err := n.Truncate(0); err != nil {
384                         return nil, err
385                 }
386         }
387         return &filehandle{
388                 inode:    n,
389                 append:   flag&os.O_APPEND != 0,
390                 readable: readable,
391                 writable: writable,
392         }, nil
393 }
394
395 func (fs *fileSystem) Open(name string) (http.File, error) {
396         return fs.OpenFile(name, os.O_RDONLY, 0)
397 }
398
399 func (fs *fileSystem) Create(name string) (File, error) {
400         return fs.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
401 }
402
403 func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
404         dirname, name := path.Split(name)
405         n, err := rlookup(fs.root, dirname)
406         if err != nil {
407                 return err
408         }
409         n.Lock()
410         defer n.Unlock()
411         if child, err := n.Child(name, nil); err != nil {
412                 return err
413         } else if child != nil {
414                 return os.ErrExist
415         }
416
417         _, err = n.Child(name, func(inode) (repl inode, err error) {
418                 repl, err = n.FS().newNode(name, perm|os.ModeDir, time.Now())
419                 if err != nil {
420                         return
421                 }
422                 repl.SetParent(n, name)
423                 return
424         })
425         return err
426 }
427
428 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
429         node, err := rlookup(fs.root, name)
430         if err != nil {
431                 return nil, err
432         }
433         return node.FileInfo(), nil
434 }
435
436 func (fs *fileSystem) Rename(oldname, newname string) error {
437         olddir, oldname := path.Split(oldname)
438         if oldname == "" || oldname == "." || oldname == ".." {
439                 return ErrInvalidArgument
440         }
441         olddirf, err := fs.openFile(olddir+".", os.O_RDONLY, 0)
442         if err != nil {
443                 return fmt.Errorf("%q: %s", olddir, err)
444         }
445         defer olddirf.Close()
446
447         newdir, newname := path.Split(newname)
448         if newname == "." || newname == ".." {
449                 return ErrInvalidArgument
450         } else if newname == "" {
451                 // Rename("a/b", "c/") means Rename("a/b", "c/b")
452                 newname = oldname
453         }
454         newdirf, err := fs.openFile(newdir+".", os.O_RDONLY, 0)
455         if err != nil {
456                 return fmt.Errorf("%q: %s", newdir, err)
457         }
458         defer newdirf.Close()
459
460         // TODO: If the nearest common ancestor ("nca") of olddirf and
461         // newdirf is on a different filesystem than fs, we should
462         // call nca.FS().Rename() instead of proceeding. Until then
463         // it's awkward for filesystems to implement their own Rename
464         // methods effectively: the only one that runs is the one on
465         // the root FileSystem exposed to the caller (webdav, fuse,
466         // etc).
467
468         // When acquiring locks on multiple inodes, avoid deadlock by
469         // locking the entire containing filesystem first.
470         cfs := olddirf.inode.FS()
471         cfs.locker().Lock()
472         defer cfs.locker().Unlock()
473
474         if cfs != newdirf.inode.FS() {
475                 // Moving inodes across filesystems is not (yet)
476                 // supported. Locking inodes from different
477                 // filesystems could deadlock, so we must error out
478                 // now.
479                 return ErrInvalidArgument
480         }
481
482         // To ensure we can test reliably whether we're about to move
483         // a directory into itself, lock all potential common
484         // ancestors of olddir and newdir.
485         needLock := []sync.Locker{}
486         for _, node := range []inode{olddirf.inode, newdirf.inode} {
487                 needLock = append(needLock, node)
488                 for node.Parent() != node && node.Parent().FS() == node.FS() {
489                         node = node.Parent()
490                         needLock = append(needLock, node)
491                 }
492         }
493         locked := map[sync.Locker]bool{}
494         for i := len(needLock) - 1; i >= 0; i-- {
495                 if n := needLock[i]; !locked[n] {
496                         n.Lock()
497                         defer n.Unlock()
498                         locked[n] = true
499                 }
500         }
501
502         _, err = olddirf.inode.Child(oldname, func(oldinode inode) (inode, error) {
503                 if oldinode == nil {
504                         return oldinode, os.ErrNotExist
505                 }
506                 if locked[oldinode] {
507                         // oldinode cannot become a descendant of itself.
508                         return oldinode, ErrInvalidArgument
509                 }
510                 if oldinode.FS() != cfs && newdirf.inode != olddirf.inode {
511                         // moving a mount point to a different parent
512                         // is not (yet) supported.
513                         return oldinode, ErrInvalidArgument
514                 }
515                 accepted, err := newdirf.inode.Child(newname, func(existing inode) (inode, error) {
516                         if existing != nil && existing.IsDir() {
517                                 return existing, ErrIsDirectory
518                         }
519                         return oldinode, nil
520                 })
521                 if err != nil {
522                         // Leave oldinode in olddir.
523                         return oldinode, err
524                 }
525                 accepted.SetParent(newdirf.inode, newname)
526                 return nil, nil
527         })
528         return err
529 }
530
531 func (fs *fileSystem) Remove(name string) error {
532         return fs.remove(strings.TrimRight(name, "/"), false)
533 }
534
535 func (fs *fileSystem) RemoveAll(name string) error {
536         err := fs.remove(strings.TrimRight(name, "/"), true)
537         if os.IsNotExist(err) {
538                 // "If the path does not exist, RemoveAll returns
539                 // nil." (see "os" pkg)
540                 err = nil
541         }
542         return err
543 }
544
545 func (fs *fileSystem) remove(name string, recursive bool) error {
546         dirname, name := path.Split(name)
547         if name == "" || name == "." || name == ".." {
548                 return ErrInvalidArgument
549         }
550         dir, err := rlookup(fs.root, dirname)
551         if err != nil {
552                 return err
553         }
554         dir.Lock()
555         defer dir.Unlock()
556         _, err = dir.Child(name, func(node inode) (inode, error) {
557                 if node == nil {
558                         return nil, os.ErrNotExist
559                 }
560                 if !recursive && node.IsDir() && node.Size() > 0 {
561                         return node, ErrDirectoryNotEmpty
562                 }
563                 return nil, nil
564         })
565         return err
566 }
567
568 func (fs *fileSystem) Sync() error {
569         log.Printf("TODO: sync fileSystem")
570         return ErrInvalidOperation
571 }
572
573 func (fs *fileSystem) Flush(bool) error {
574         log.Printf("TODO: flush fileSystem")
575         return ErrInvalidOperation
576 }
577
578 // rlookup (recursive lookup) returns the inode for the file/directory
579 // with the given name (which may contain "/" separators). If no such
580 // file/directory exists, the returned node is nil.
581 func rlookup(start inode, path string) (node inode, err error) {
582         node = start
583         for _, name := range strings.Split(path, "/") {
584                 if node.IsDir() {
585                         if name == "." || name == "" {
586                                 continue
587                         }
588                         if name == ".." {
589                                 node = node.Parent()
590                                 continue
591                         }
592                 }
593                 node, err = func() (inode, error) {
594                         node.RLock()
595                         defer node.RUnlock()
596                         return node.Child(name, nil)
597                 }()
598                 if node == nil || err != nil {
599                         break
600                 }
601         }
602         if node == nil && err == nil {
603                 err = os.ErrNotExist
604         }
605         return
606 }
607
608 func permittedName(name string) bool {
609         return name != "" && name != "." && name != ".." && !strings.Contains(name, "/")
610 }