15652: Only flush the current dir, and only once per 64 MiB.
[arvados.git] / sdk / go / arvados / fs_base.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         "net/http"
13         "os"
14         "path"
15         "strings"
16         "sync"
17         "time"
18 )
19
20 var (
21         ErrReadOnlyFile      = errors.New("read-only file")
22         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
23         ErrFileExists        = errors.New("file exists")
24         ErrInvalidOperation  = errors.New("invalid operation")
25         ErrInvalidArgument   = errors.New("invalid argument")
26         ErrDirectoryNotEmpty = errors.New("directory not empty")
27         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
28         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
29         ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
30         ErrNotADirectory     = errors.New("not a directory")
31         ErrPermission        = os.ErrPermission
32 )
33
34 // A File is an *os.File-like interface for reading and writing files
35 // in a FileSystem.
36 type File interface {
37         io.Reader
38         io.Writer
39         io.Closer
40         io.Seeker
41         Size() int64
42         Readdir(int) ([]os.FileInfo, error)
43         Stat() (os.FileInfo, error)
44         Truncate(int64) error
45         Sync() error
46 }
47
48 // A FileSystem is an http.Filesystem plus Stat() and support for
49 // opening writable files. All methods are safe to call from multiple
50 // goroutines.
51 type FileSystem interface {
52         http.FileSystem
53         fsBackend
54
55         rootnode() inode
56
57         // filesystem-wide lock: used by Rename() to prevent deadlock
58         // while locking multiple inodes.
59         locker() sync.Locker
60
61         // create a new node with nil parent.
62         newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
63
64         // analogous to os.Stat()
65         Stat(name string) (os.FileInfo, error)
66
67         // analogous to os.Create(): create/truncate a file and open it O_RDWR.
68         Create(name string) (File, error)
69
70         // Like os.OpenFile(): create or open a file or directory.
71         //
72         // If flag&os.O_EXCL==0, it opens an existing file or
73         // directory if one exists. If flag&os.O_CREATE!=0, it creates
74         // a new empty file or directory if one does not already
75         // exist.
76         //
77         // When creating a new item, perm&os.ModeDir determines
78         // whether it is a file or a directory.
79         //
80         // A file can be opened multiple times and used concurrently
81         // from multiple goroutines. However, each File object should
82         // be used by only one goroutine at a time.
83         OpenFile(name string, flag int, perm os.FileMode) (File, error)
84
85         Mkdir(name string, perm os.FileMode) error
86         Remove(name string) error
87         RemoveAll(name string) error
88         Rename(oldname, newname string) error
89
90         // Write buffered data from memory to storage, returning when
91         // all updates have been saved to persistent storage.
92         Sync() error
93
94         // Write buffered data from memory to storage, but don't wait
95         // for all writes to finish before returning. If shortBlocks
96         // is true, flush everything; otherwise, if there's less than
97         // a full block of buffered data at the end of a stream, leave
98         // it buffered in memory in case more data can be appended. If
99         // path is "", flush all dirs/streams; otherwise, flush only
100         // the specified dir/stream.
101         Flush(path string, shortBlocks bool) error
102 }
103
104 type inode interface {
105         SetParent(parent inode, name string)
106         Parent() inode
107         FS() FileSystem
108         Read([]byte, filenodePtr) (int, filenodePtr, error)
109         Write([]byte, filenodePtr) (int, filenodePtr, error)
110         Truncate(int64) error
111         IsDir() bool
112         Readdir() ([]os.FileInfo, error)
113         Size() int64
114         FileInfo() os.FileInfo
115
116         // Child() performs lookups and updates of named child nodes.
117         //
118         // (The term "child" here is used strictly. This means name is
119         // not "." or "..", and name does not contain "/".)
120         //
121         // If replace is non-nil, Child calls replace(x) where x is
122         // the current child inode with the given name. If possible,
123         // the child inode is replaced with the one returned by
124         // replace().
125         //
126         // If replace(x) returns an inode (besides x or nil) that is
127         // subsequently returned by Child(), then Child()'s caller
128         // must ensure the new child's name and parent are set/updated
129         // to Child()'s name argument and its receiver respectively.
130         // This is not necessarily done before replace(x) returns, but
131         // it must be done before Child()'s caller releases the
132         // parent's lock.
133         //
134         // Nil represents "no child". replace(nil) signifies that no
135         // child with this name exists yet. If replace() returns nil,
136         // the existing child should be deleted if possible.
137         //
138         // An implementation of Child() is permitted to ignore
139         // replace() or its return value. For example, a regular file
140         // inode does not have children, so Child() always returns
141         // nil.
142         //
143         // Child() returns the child, if any, with the given name: if
144         // a child was added or changed, the new child is returned.
145         //
146         // Caller must have lock (or rlock if replace is nil).
147         Child(name string, replace func(inode) (inode, error)) (inode, error)
148
149         sync.Locker
150         RLock()
151         RUnlock()
152 }
153
154 type fileinfo struct {
155         name    string
156         mode    os.FileMode
157         size    int64
158         modTime time.Time
159 }
160
161 // Name implements os.FileInfo.
162 func (fi fileinfo) Name() string {
163         return fi.name
164 }
165
166 // ModTime implements os.FileInfo.
167 func (fi fileinfo) ModTime() time.Time {
168         return fi.modTime
169 }
170
171 // Mode implements os.FileInfo.
172 func (fi fileinfo) Mode() os.FileMode {
173         return fi.mode
174 }
175
176 // IsDir implements os.FileInfo.
177 func (fi fileinfo) IsDir() bool {
178         return fi.mode&os.ModeDir != 0
179 }
180
181 // Size implements os.FileInfo.
182 func (fi fileinfo) Size() int64 {
183         return fi.size
184 }
185
186 // Sys implements os.FileInfo.
187 func (fi fileinfo) Sys() interface{} {
188         return nil
189 }
190
191 type nullnode struct{}
192
193 func (*nullnode) Mkdir(string, os.FileMode) error {
194         return ErrInvalidOperation
195 }
196
197 func (*nullnode) Read([]byte, filenodePtr) (int, filenodePtr, error) {
198         return 0, filenodePtr{}, ErrInvalidOperation
199 }
200
201 func (*nullnode) Write([]byte, filenodePtr) (int, filenodePtr, error) {
202         return 0, filenodePtr{}, ErrInvalidOperation
203 }
204
205 func (*nullnode) Truncate(int64) error {
206         return ErrInvalidOperation
207 }
208
209 func (*nullnode) FileInfo() os.FileInfo {
210         return fileinfo{}
211 }
212
213 func (*nullnode) IsDir() bool {
214         return false
215 }
216
217 func (*nullnode) Readdir() ([]os.FileInfo, error) {
218         return nil, ErrInvalidOperation
219 }
220
221 func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
222         return nil, ErrNotADirectory
223 }
224
225 type treenode struct {
226         fs       FileSystem
227         parent   inode
228         inodes   map[string]inode
229         fileinfo fileinfo
230         sync.RWMutex
231         nullnode
232 }
233
234 func (n *treenode) FS() FileSystem {
235         return n.fs
236 }
237
238 func (n *treenode) SetParent(p inode, name string) {
239         n.Lock()
240         defer n.Unlock()
241         n.parent = p
242         n.fileinfo.name = name
243 }
244
245 func (n *treenode) Parent() inode {
246         n.RLock()
247         defer n.RUnlock()
248         return n.parent
249 }
250
251 func (n *treenode) IsDir() bool {
252         return true
253 }
254
255 func (n *treenode) Child(name string, replace func(inode) (inode, error)) (child inode, err error) {
256         child = n.inodes[name]
257         if name == "" || name == "." || name == ".." {
258                 err = ErrInvalidArgument
259                 return
260         }
261         if replace == nil {
262                 return
263         }
264         newchild, err := replace(child)
265         if err != nil {
266                 return
267         }
268         if newchild == nil {
269                 delete(n.inodes, name)
270         } else if newchild != child {
271                 n.inodes[name] = newchild
272                 n.fileinfo.modTime = time.Now()
273                 child = newchild
274         }
275         return
276 }
277
278 func (n *treenode) Size() int64 {
279         return n.FileInfo().Size()
280 }
281
282 func (n *treenode) FileInfo() os.FileInfo {
283         n.Lock()
284         defer n.Unlock()
285         n.fileinfo.size = int64(len(n.inodes))
286         return n.fileinfo
287 }
288
289 func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
290         n.RLock()
291         defer n.RUnlock()
292         fi = make([]os.FileInfo, 0, len(n.inodes))
293         for _, inode := range n.inodes {
294                 fi = append(fi, inode.FileInfo())
295         }
296         return
297 }
298
299 type fileSystem struct {
300         root inode
301         fsBackend
302         mutex sync.Mutex
303 }
304
305 func (fs *fileSystem) rootnode() inode {
306         return fs.root
307 }
308
309 func (fs *fileSystem) locker() sync.Locker {
310         return &fs.mutex
311 }
312
313 // OpenFile is analogous to os.OpenFile().
314 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
315         return fs.openFile(name, flag, perm)
316 }
317
318 func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
319         if flag&os.O_SYNC != 0 {
320                 return nil, ErrSyncNotSupported
321         }
322         dirname, name := path.Split(name)
323         parent, err := rlookup(fs.root, dirname)
324         if err != nil {
325                 return nil, err
326         }
327         var readable, writable bool
328         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
329         case os.O_RDWR:
330                 readable = true
331                 writable = true
332         case os.O_RDONLY:
333                 readable = true
334         case os.O_WRONLY:
335                 writable = true
336         default:
337                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
338         }
339         if !writable && parent.IsDir() {
340                 // A directory can be opened via "foo/", "foo/.", or
341                 // "foo/..".
342                 switch name {
343                 case ".", "":
344                         return &filehandle{inode: parent}, nil
345                 case "..":
346                         return &filehandle{inode: parent.Parent()}, nil
347                 }
348         }
349         createMode := flag&os.O_CREATE != 0
350         if createMode {
351                 parent.Lock()
352                 defer parent.Unlock()
353         } else {
354                 parent.RLock()
355                 defer parent.RUnlock()
356         }
357         n, err := parent.Child(name, nil)
358         if err != nil {
359                 return nil, err
360         } else if n == nil {
361                 if !createMode {
362                         return nil, os.ErrNotExist
363                 }
364                 n, err = parent.Child(name, func(inode) (repl inode, err error) {
365                         repl, err = parent.FS().newNode(name, perm|0755, time.Now())
366                         if err != nil {
367                                 return
368                         }
369                         repl.SetParent(parent, name)
370                         return
371                 })
372                 if err != nil {
373                         return nil, err
374                 } else if n == nil {
375                         // Parent rejected new child, but returned no error
376                         return nil, ErrInvalidArgument
377                 }
378         } else if flag&os.O_EXCL != 0 {
379                 return nil, ErrFileExists
380         } else if flag&os.O_TRUNC != 0 {
381                 if !writable {
382                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
383                 } else if n.IsDir() {
384                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
385                 } else if err := n.Truncate(0); err != nil {
386                         return nil, err
387                 }
388         }
389         return &filehandle{
390                 inode:    n,
391                 append:   flag&os.O_APPEND != 0,
392                 readable: readable,
393                 writable: writable,
394         }, nil
395 }
396
397 func (fs *fileSystem) Open(name string) (http.File, error) {
398         return fs.OpenFile(name, os.O_RDONLY, 0)
399 }
400
401 func (fs *fileSystem) Create(name string) (File, error) {
402         return fs.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
403 }
404
405 func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
406         dirname, name := path.Split(name)
407         n, err := rlookup(fs.root, dirname)
408         if err != nil {
409                 return err
410         }
411         n.Lock()
412         defer n.Unlock()
413         if child, err := n.Child(name, nil); err != nil {
414                 return err
415         } else if child != nil {
416                 return os.ErrExist
417         }
418
419         _, err = n.Child(name, func(inode) (repl inode, err error) {
420                 repl, err = n.FS().newNode(name, perm|os.ModeDir, time.Now())
421                 if err != nil {
422                         return
423                 }
424                 repl.SetParent(n, name)
425                 return
426         })
427         return err
428 }
429
430 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
431         node, err := rlookup(fs.root, name)
432         if err != nil {
433                 return nil, err
434         }
435         return node.FileInfo(), nil
436 }
437
438 func (fs *fileSystem) Rename(oldname, newname string) error {
439         olddir, oldname := path.Split(oldname)
440         if oldname == "" || oldname == "." || oldname == ".." {
441                 return ErrInvalidArgument
442         }
443         olddirf, err := fs.openFile(olddir+".", os.O_RDONLY, 0)
444         if err != nil {
445                 return fmt.Errorf("%q: %s", olddir, err)
446         }
447         defer olddirf.Close()
448
449         newdir, newname := path.Split(newname)
450         if newname == "." || newname == ".." {
451                 return ErrInvalidArgument
452         } else if newname == "" {
453                 // Rename("a/b", "c/") means Rename("a/b", "c/b")
454                 newname = oldname
455         }
456         newdirf, err := fs.openFile(newdir+".", os.O_RDONLY, 0)
457         if err != nil {
458                 return fmt.Errorf("%q: %s", newdir, err)
459         }
460         defer newdirf.Close()
461
462         // TODO: If the nearest common ancestor ("nca") of olddirf and
463         // newdirf is on a different filesystem than fs, we should
464         // call nca.FS().Rename() instead of proceeding. Until then
465         // it's awkward for filesystems to implement their own Rename
466         // methods effectively: the only one that runs is the one on
467         // the root FileSystem exposed to the caller (webdav, fuse,
468         // etc).
469
470         // When acquiring locks on multiple inodes, avoid deadlock by
471         // locking the entire containing filesystem first.
472         cfs := olddirf.inode.FS()
473         cfs.locker().Lock()
474         defer cfs.locker().Unlock()
475
476         if cfs != newdirf.inode.FS() {
477                 // Moving inodes across filesystems is not (yet)
478                 // supported. Locking inodes from different
479                 // filesystems could deadlock, so we must error out
480                 // now.
481                 return ErrInvalidArgument
482         }
483
484         // To ensure we can test reliably whether we're about to move
485         // a directory into itself, lock all potential common
486         // ancestors of olddir and newdir.
487         needLock := []sync.Locker{}
488         for _, node := range []inode{olddirf.inode, newdirf.inode} {
489                 needLock = append(needLock, node)
490                 for node.Parent() != node && node.Parent().FS() == node.FS() {
491                         node = node.Parent()
492                         needLock = append(needLock, node)
493                 }
494         }
495         locked := map[sync.Locker]bool{}
496         for i := len(needLock) - 1; i >= 0; i-- {
497                 if n := needLock[i]; !locked[n] {
498                         n.Lock()
499                         defer n.Unlock()
500                         locked[n] = true
501                 }
502         }
503
504         _, err = olddirf.inode.Child(oldname, func(oldinode inode) (inode, error) {
505                 if oldinode == nil {
506                         return oldinode, os.ErrNotExist
507                 }
508                 if locked[oldinode] {
509                         // oldinode cannot become a descendant of itself.
510                         return oldinode, ErrInvalidArgument
511                 }
512                 if oldinode.FS() != cfs && newdirf.inode != olddirf.inode {
513                         // moving a mount point to a different parent
514                         // is not (yet) supported.
515                         return oldinode, ErrInvalidArgument
516                 }
517                 accepted, err := newdirf.inode.Child(newname, func(existing inode) (inode, error) {
518                         if existing != nil && existing.IsDir() {
519                                 return existing, ErrIsDirectory
520                         }
521                         return oldinode, nil
522                 })
523                 if err != nil {
524                         // Leave oldinode in olddir.
525                         return oldinode, err
526                 }
527                 accepted.SetParent(newdirf.inode, newname)
528                 return nil, nil
529         })
530         return err
531 }
532
533 func (fs *fileSystem) Remove(name string) error {
534         return fs.remove(strings.TrimRight(name, "/"), false)
535 }
536
537 func (fs *fileSystem) RemoveAll(name string) error {
538         err := fs.remove(strings.TrimRight(name, "/"), true)
539         if os.IsNotExist(err) {
540                 // "If the path does not exist, RemoveAll returns
541                 // nil." (see "os" pkg)
542                 err = nil
543         }
544         return err
545 }
546
547 func (fs *fileSystem) remove(name string, recursive bool) error {
548         dirname, name := path.Split(name)
549         if name == "" || name == "." || name == ".." {
550                 return ErrInvalidArgument
551         }
552         dir, err := rlookup(fs.root, dirname)
553         if err != nil {
554                 return err
555         }
556         dir.Lock()
557         defer dir.Unlock()
558         _, err = dir.Child(name, func(node inode) (inode, error) {
559                 if node == nil {
560                         return nil, os.ErrNotExist
561                 }
562                 if !recursive && node.IsDir() && node.Size() > 0 {
563                         return node, ErrDirectoryNotEmpty
564                 }
565                 return nil, nil
566         })
567         return err
568 }
569
570 func (fs *fileSystem) Sync() error {
571         log.Printf("TODO: sync fileSystem")
572         return ErrInvalidOperation
573 }
574
575 func (fs *fileSystem) Flush(string, bool) error {
576         log.Printf("TODO: flush fileSystem")
577         return ErrInvalidOperation
578 }
579
580 // rlookup (recursive lookup) returns the inode for the file/directory
581 // with the given name (which may contain "/" separators). If no such
582 // file/directory exists, the returned node is nil.
583 func rlookup(start inode, path string) (node inode, err error) {
584         node = start
585         for _, name := range strings.Split(path, "/") {
586                 if node.IsDir() {
587                         if name == "." || name == "" {
588                                 continue
589                         }
590                         if name == ".." {
591                                 node = node.Parent()
592                                 continue
593                         }
594                 }
595                 node, err = func() (inode, error) {
596                         node.RLock()
597                         defer node.RUnlock()
598                         return node.Child(name, nil)
599                 }()
600                 if node == nil || err != nil {
601                         break
602                 }
603         }
604         if node == nil && err == nil {
605                 err = os.ErrNotExist
606         }
607         return
608 }
609
610 func permittedName(name string) bool {
611         return name != "" && name != "." && name != ".." && !strings.Contains(name, "/")
612 }