17574: Add BalanceUpdateLimit config, fix tests.
[arvados.git] / sdk / go / arvados / fs_base.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         "net/http"
13         "os"
14         "path"
15         "strings"
16         "sync"
17         "time"
18 )
19
20 var (
21         ErrReadOnlyFile      = errors.New("read-only file")
22         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
23         ErrFileExists        = errors.New("file exists")
24         ErrInvalidOperation  = errors.New("invalid operation")
25         ErrInvalidArgument   = errors.New("invalid argument")
26         ErrDirectoryNotEmpty = errors.New("directory not empty")
27         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
28         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
29         ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
30         ErrNotADirectory     = errors.New("not a directory")
31         ErrPermission        = os.ErrPermission
32         DebugLocksPanicMode  = false
33 )
34
35 type syncer interface {
36         Sync() error
37 }
38
39 func debugPanicIfNotLocked(l sync.Locker, writing bool) {
40         if !DebugLocksPanicMode {
41                 return
42         }
43         race := false
44         if rl, ok := l.(interface {
45                 RLock()
46                 RUnlock()
47         }); ok && writing {
48                 go func() {
49                         // Fail if we can grab the read lock during an
50                         // operation that purportedly has write lock.
51                         rl.RLock()
52                         race = true
53                         rl.RUnlock()
54                 }()
55         } else {
56                 go func() {
57                         l.Lock()
58                         race = true
59                         l.Unlock()
60                 }()
61         }
62         time.Sleep(100)
63         if race {
64                 panic("bug: caller-must-have-lock func called, but nobody has lock")
65         }
66 }
67
68 // A File is an *os.File-like interface for reading and writing files
69 // in a FileSystem.
70 type File interface {
71         io.Reader
72         io.Writer
73         io.Closer
74         io.Seeker
75         Size() int64
76         Readdir(int) ([]os.FileInfo, error)
77         Stat() (os.FileInfo, error)
78         Truncate(int64) error
79         Sync() error
80 }
81
82 // A FileSystem is an http.Filesystem plus Stat() and support for
83 // opening writable files. All methods are safe to call from multiple
84 // goroutines.
85 type FileSystem interface {
86         http.FileSystem
87         fsBackend
88
89         rootnode() inode
90
91         // filesystem-wide lock: used by Rename() to prevent deadlock
92         // while locking multiple inodes.
93         locker() sync.Locker
94
95         // throttle for limiting concurrent background writers
96         throttle() *throttle
97
98         // create a new node with nil parent.
99         newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
100
101         // analogous to os.Stat()
102         Stat(name string) (os.FileInfo, error)
103
104         // analogous to os.Create(): create/truncate a file and open it O_RDWR.
105         Create(name string) (File, error)
106
107         // Like os.OpenFile(): create or open a file or directory.
108         //
109         // If flag&os.O_EXCL==0, it opens an existing file or
110         // directory if one exists. If flag&os.O_CREATE!=0, it creates
111         // a new empty file or directory if one does not already
112         // exist.
113         //
114         // When creating a new item, perm&os.ModeDir determines
115         // whether it is a file or a directory.
116         //
117         // A file can be opened multiple times and used concurrently
118         // from multiple goroutines. However, each File object should
119         // be used by only one goroutine at a time.
120         OpenFile(name string, flag int, perm os.FileMode) (File, error)
121
122         Mkdir(name string, perm os.FileMode) error
123         Remove(name string) error
124         RemoveAll(name string) error
125         Rename(oldname, newname string) error
126
127         // Write buffered data from memory to storage, returning when
128         // all updates have been saved to persistent storage.
129         Sync() error
130
131         // Write buffered data from memory to storage, but don't wait
132         // for all writes to finish before returning. If shortBlocks
133         // is true, flush everything; otherwise, if there's less than
134         // a full block of buffered data at the end of a stream, leave
135         // it buffered in memory in case more data can be appended. If
136         // path is "", flush all dirs/streams; otherwise, flush only
137         // the specified dir/stream.
138         Flush(path string, shortBlocks bool) error
139
140         // Estimate current memory usage.
141         MemorySize() int64
142 }
143
144 type inode interface {
145         SetParent(parent inode, name string)
146         Parent() inode
147         FS() FileSystem
148         Read([]byte, filenodePtr) (int, filenodePtr, error)
149         Write([]byte, filenodePtr) (int, filenodePtr, error)
150         Truncate(int64) error
151         IsDir() bool
152         Readdir() ([]os.FileInfo, error)
153         Size() int64
154         FileInfo() os.FileInfo
155
156         // Child() performs lookups and updates of named child nodes.
157         //
158         // (The term "child" here is used strictly. This means name is
159         // not "." or "..", and name does not contain "/".)
160         //
161         // If replace is non-nil, Child calls replace(x) where x is
162         // the current child inode with the given name. If possible,
163         // the child inode is replaced with the one returned by
164         // replace().
165         //
166         // If replace(x) returns an inode (besides x or nil) that is
167         // subsequently returned by Child(), then Child()'s caller
168         // must ensure the new child's name and parent are set/updated
169         // to Child()'s name argument and its receiver respectively.
170         // This is not necessarily done before replace(x) returns, but
171         // it must be done before Child()'s caller releases the
172         // parent's lock.
173         //
174         // Nil represents "no child". replace(nil) signifies that no
175         // child with this name exists yet. If replace() returns nil,
176         // the existing child should be deleted if possible.
177         //
178         // An implementation of Child() is permitted to ignore
179         // replace() or its return value. For example, a regular file
180         // inode does not have children, so Child() always returns
181         // nil.
182         //
183         // Child() returns the child, if any, with the given name: if
184         // a child was added or changed, the new child is returned.
185         //
186         // Caller must have lock (or rlock if replace is nil).
187         Child(name string, replace func(inode) (inode, error)) (inode, error)
188
189         sync.Locker
190         RLock()
191         RUnlock()
192         MemorySize() int64
193 }
194
195 type fileinfo struct {
196         name    string
197         mode    os.FileMode
198         size    int64
199         modTime time.Time
200 }
201
202 // Name implements os.FileInfo.
203 func (fi fileinfo) Name() string {
204         return fi.name
205 }
206
207 // ModTime implements os.FileInfo.
208 func (fi fileinfo) ModTime() time.Time {
209         return fi.modTime
210 }
211
212 // Mode implements os.FileInfo.
213 func (fi fileinfo) Mode() os.FileMode {
214         return fi.mode
215 }
216
217 // IsDir implements os.FileInfo.
218 func (fi fileinfo) IsDir() bool {
219         return fi.mode&os.ModeDir != 0
220 }
221
222 // Size implements os.FileInfo.
223 func (fi fileinfo) Size() int64 {
224         return fi.size
225 }
226
227 // Sys implements os.FileInfo.
228 func (fi fileinfo) Sys() interface{} {
229         return nil
230 }
231
232 type nullnode struct{}
233
234 func (*nullnode) Mkdir(string, os.FileMode) error {
235         return ErrInvalidOperation
236 }
237
238 func (*nullnode) Read([]byte, filenodePtr) (int, filenodePtr, error) {
239         return 0, filenodePtr{}, ErrInvalidOperation
240 }
241
242 func (*nullnode) Write([]byte, filenodePtr) (int, filenodePtr, error) {
243         return 0, filenodePtr{}, ErrInvalidOperation
244 }
245
246 func (*nullnode) Truncate(int64) error {
247         return ErrInvalidOperation
248 }
249
250 func (*nullnode) FileInfo() os.FileInfo {
251         return fileinfo{}
252 }
253
254 func (*nullnode) IsDir() bool {
255         return false
256 }
257
258 func (*nullnode) Readdir() ([]os.FileInfo, error) {
259         return nil, ErrInvalidOperation
260 }
261
262 func (*nullnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
263         return nil, ErrNotADirectory
264 }
265
266 func (*nullnode) MemorySize() int64 {
267         // Types that embed nullnode should report their own size, but
268         // if they don't, we at least report a non-zero size to ensure
269         // a large tree doesn't get reported as 0 bytes.
270         return 64
271 }
272
273 type treenode struct {
274         fs       FileSystem
275         parent   inode
276         inodes   map[string]inode
277         fileinfo fileinfo
278         sync.RWMutex
279         nullnode
280 }
281
282 func (n *treenode) FS() FileSystem {
283         return n.fs
284 }
285
286 func (n *treenode) SetParent(p inode, name string) {
287         n.Lock()
288         defer n.Unlock()
289         n.parent = p
290         n.fileinfo.name = name
291 }
292
293 func (n *treenode) Parent() inode {
294         n.RLock()
295         defer n.RUnlock()
296         return n.parent
297 }
298
299 func (n *treenode) IsDir() bool {
300         return true
301 }
302
303 func (n *treenode) Child(name string, replace func(inode) (inode, error)) (child inode, err error) {
304         debugPanicIfNotLocked(n, false)
305         child = n.inodes[name]
306         if name == "" || name == "." || name == ".." {
307                 err = ErrInvalidArgument
308                 return
309         }
310         if replace == nil {
311                 return
312         }
313         newchild, err := replace(child)
314         if err != nil {
315                 return
316         }
317         if newchild == nil {
318                 debugPanicIfNotLocked(n, true)
319                 delete(n.inodes, name)
320         } else if newchild != child {
321                 debugPanicIfNotLocked(n, true)
322                 n.inodes[name] = newchild
323                 n.fileinfo.modTime = time.Now()
324                 child = newchild
325         }
326         return
327 }
328
329 func (n *treenode) Size() int64 {
330         return n.FileInfo().Size()
331 }
332
333 func (n *treenode) FileInfo() os.FileInfo {
334         n.Lock()
335         defer n.Unlock()
336         n.fileinfo.size = int64(len(n.inodes))
337         return n.fileinfo
338 }
339
340 func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
341         n.RLock()
342         defer n.RUnlock()
343         fi = make([]os.FileInfo, 0, len(n.inodes))
344         for _, inode := range n.inodes {
345                 fi = append(fi, inode.FileInfo())
346         }
347         return
348 }
349
350 func (n *treenode) Sync() error {
351         n.RLock()
352         defer n.RUnlock()
353         for _, inode := range n.inodes {
354                 syncer, ok := inode.(syncer)
355                 if !ok {
356                         return ErrInvalidOperation
357                 }
358                 err := syncer.Sync()
359                 if err != nil {
360                         return err
361                 }
362         }
363         return nil
364 }
365
366 func (n *treenode) MemorySize() (size int64) {
367         n.RLock()
368         defer n.RUnlock()
369         debugPanicIfNotLocked(n, false)
370         for _, inode := range n.inodes {
371                 size += inode.MemorySize()
372         }
373         return
374 }
375
376 type fileSystem struct {
377         root inode
378         fsBackend
379         mutex sync.Mutex
380         thr   *throttle
381 }
382
383 func (fs *fileSystem) rootnode() inode {
384         return fs.root
385 }
386
387 func (fs *fileSystem) throttle() *throttle {
388         return fs.thr
389 }
390
391 func (fs *fileSystem) locker() sync.Locker {
392         return &fs.mutex
393 }
394
395 // OpenFile is analogous to os.OpenFile().
396 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
397         return fs.openFile(name, flag, perm)
398 }
399
400 func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
401         if flag&os.O_SYNC != 0 {
402                 return nil, ErrSyncNotSupported
403         }
404         dirname, name := path.Split(name)
405         parent, err := rlookup(fs.root, dirname)
406         if err != nil {
407                 return nil, err
408         }
409         var readable, writable bool
410         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
411         case os.O_RDWR:
412                 readable = true
413                 writable = true
414         case os.O_RDONLY:
415                 readable = true
416         case os.O_WRONLY:
417                 writable = true
418         default:
419                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
420         }
421         if !writable && parent.IsDir() {
422                 // A directory can be opened via "foo/", "foo/.", or
423                 // "foo/..".
424                 switch name {
425                 case ".", "":
426                         return &filehandle{inode: parent}, nil
427                 case "..":
428                         return &filehandle{inode: parent.Parent()}, nil
429                 }
430         }
431         createMode := flag&os.O_CREATE != 0
432         // We always need to take Lock() here, not just RLock(). Even
433         // if we know we won't be creating a file, parent might be a
434         // lookupnode, which sometimes populates its inodes map during
435         // a Child() call.
436         parent.Lock()
437         defer parent.Unlock()
438         n, err := parent.Child(name, nil)
439         if err != nil {
440                 return nil, err
441         } else if n == nil {
442                 if !createMode {
443                         return nil, os.ErrNotExist
444                 }
445                 n, err = parent.Child(name, func(inode) (repl inode, err error) {
446                         repl, err = parent.FS().newNode(name, perm|0755, time.Now())
447                         if err != nil {
448                                 return
449                         }
450                         repl.SetParent(parent, name)
451                         return
452                 })
453                 if err != nil {
454                         return nil, err
455                 } else if n == nil {
456                         // Parent rejected new child, but returned no error
457                         return nil, ErrInvalidArgument
458                 }
459         } else if flag&os.O_EXCL != 0 {
460                 return nil, ErrFileExists
461         } else if flag&os.O_TRUNC != 0 {
462                 if !writable {
463                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
464                 } else if n.IsDir() {
465                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
466                 } else if err := n.Truncate(0); err != nil {
467                         return nil, err
468                 }
469         }
470         return &filehandle{
471                 inode:    n,
472                 append:   flag&os.O_APPEND != 0,
473                 readable: readable,
474                 writable: writable,
475         }, nil
476 }
477
478 func (fs *fileSystem) Open(name string) (http.File, error) {
479         return fs.OpenFile(name, os.O_RDONLY, 0)
480 }
481
482 func (fs *fileSystem) Create(name string) (File, error) {
483         return fs.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
484 }
485
486 func (fs *fileSystem) Mkdir(name string, perm os.FileMode) error {
487         dirname, name := path.Split(name)
488         n, err := rlookup(fs.root, dirname)
489         if err != nil {
490                 return err
491         }
492         n.Lock()
493         defer n.Unlock()
494         if child, err := n.Child(name, nil); err != nil {
495                 return err
496         } else if child != nil {
497                 return os.ErrExist
498         }
499
500         _, err = n.Child(name, func(inode) (repl inode, err error) {
501                 repl, err = n.FS().newNode(name, perm|os.ModeDir, time.Now())
502                 if err != nil {
503                         return
504                 }
505                 repl.SetParent(n, name)
506                 return
507         })
508         return err
509 }
510
511 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
512         node, err := rlookup(fs.root, name)
513         if err != nil {
514                 return nil, err
515         }
516         return node.FileInfo(), nil
517 }
518
519 func (fs *fileSystem) Rename(oldname, newname string) error {
520         olddir, oldname := path.Split(oldname)
521         if oldname == "" || oldname == "." || oldname == ".." {
522                 return ErrInvalidArgument
523         }
524         olddirf, err := fs.openFile(olddir+".", os.O_RDONLY, 0)
525         if err != nil {
526                 return fmt.Errorf("%q: %s", olddir, err)
527         }
528         defer olddirf.Close()
529
530         newdir, newname := path.Split(newname)
531         if newname == "." || newname == ".." {
532                 return ErrInvalidArgument
533         } else if newname == "" {
534                 // Rename("a/b", "c/") means Rename("a/b", "c/b")
535                 newname = oldname
536         }
537         newdirf, err := fs.openFile(newdir+".", os.O_RDONLY, 0)
538         if err != nil {
539                 return fmt.Errorf("%q: %s", newdir, err)
540         }
541         defer newdirf.Close()
542
543         // TODO: If the nearest common ancestor ("nca") of olddirf and
544         // newdirf is on a different filesystem than fs, we should
545         // call nca.FS().Rename() instead of proceeding. Until then
546         // it's awkward for filesystems to implement their own Rename
547         // methods effectively: the only one that runs is the one on
548         // the root FileSystem exposed to the caller (webdav, fuse,
549         // etc).
550
551         // When acquiring locks on multiple inodes, avoid deadlock by
552         // locking the entire containing filesystem first.
553         cfs := olddirf.inode.FS()
554         cfs.locker().Lock()
555         defer cfs.locker().Unlock()
556
557         if cfs != newdirf.inode.FS() {
558                 // Moving inodes across filesystems is not (yet)
559                 // supported. Locking inodes from different
560                 // filesystems could deadlock, so we must error out
561                 // now.
562                 return ErrInvalidArgument
563         }
564
565         // To ensure we can test reliably whether we're about to move
566         // a directory into itself, lock all potential common
567         // ancestors of olddir and newdir.
568         needLock := []sync.Locker{}
569         for _, node := range []inode{olddirf.inode, newdirf.inode} {
570                 needLock = append(needLock, node)
571                 for node.Parent() != node && node.Parent().FS() == node.FS() {
572                         node = node.Parent()
573                         needLock = append(needLock, node)
574                 }
575         }
576         locked := map[sync.Locker]bool{}
577         for i := len(needLock) - 1; i >= 0; i-- {
578                 if n := needLock[i]; !locked[n] {
579                         n.Lock()
580                         defer n.Unlock()
581                         locked[n] = true
582                 }
583         }
584
585         _, err = olddirf.inode.Child(oldname, func(oldinode inode) (inode, error) {
586                 if oldinode == nil {
587                         return oldinode, os.ErrNotExist
588                 }
589                 if locked[oldinode] {
590                         // oldinode cannot become a descendant of itself.
591                         return oldinode, ErrInvalidArgument
592                 }
593                 if oldinode.FS() != cfs && newdirf.inode != olddirf.inode {
594                         // moving a mount point to a different parent
595                         // is not (yet) supported.
596                         return oldinode, ErrInvalidArgument
597                 }
598                 accepted, err := newdirf.inode.Child(newname, func(existing inode) (inode, error) {
599                         if existing != nil && existing.IsDir() {
600                                 return existing, ErrIsDirectory
601                         }
602                         return oldinode, nil
603                 })
604                 if err != nil {
605                         // Leave oldinode in olddir.
606                         return oldinode, err
607                 }
608                 accepted.SetParent(newdirf.inode, newname)
609                 return nil, nil
610         })
611         return err
612 }
613
614 func (fs *fileSystem) Remove(name string) error {
615         return fs.remove(strings.TrimRight(name, "/"), false)
616 }
617
618 func (fs *fileSystem) RemoveAll(name string) error {
619         err := fs.remove(strings.TrimRight(name, "/"), true)
620         if os.IsNotExist(err) {
621                 // "If the path does not exist, RemoveAll returns
622                 // nil." (see "os" pkg)
623                 err = nil
624         }
625         return err
626 }
627
628 func (fs *fileSystem) remove(name string, recursive bool) error {
629         dirname, name := path.Split(name)
630         if name == "" || name == "." || name == ".." {
631                 return ErrInvalidArgument
632         }
633         dir, err := rlookup(fs.root, dirname)
634         if err != nil {
635                 return err
636         }
637         dir.Lock()
638         defer dir.Unlock()
639         _, err = dir.Child(name, func(node inode) (inode, error) {
640                 if node == nil {
641                         return nil, os.ErrNotExist
642                 }
643                 if !recursive && node.IsDir() && node.Size() > 0 {
644                         return node, ErrDirectoryNotEmpty
645                 }
646                 return nil, nil
647         })
648         return err
649 }
650
651 func (fs *fileSystem) Sync() error {
652         if syncer, ok := fs.root.(syncer); ok {
653                 return syncer.Sync()
654         }
655         return ErrInvalidOperation
656 }
657
658 func (fs *fileSystem) Flush(string, bool) error {
659         log.Printf("TODO: flush fileSystem")
660         return ErrInvalidOperation
661 }
662
663 func (fs *fileSystem) MemorySize() int64 {
664         return fs.root.MemorySize()
665 }
666
667 // rlookup (recursive lookup) returns the inode for the file/directory
668 // with the given name (which may contain "/" separators). If no such
669 // file/directory exists, the returned node is nil.
670 func rlookup(start inode, path string) (node inode, err error) {
671         node = start
672         for _, name := range strings.Split(path, "/") {
673                 if node.IsDir() {
674                         if name == "." || name == "" {
675                                 continue
676                         }
677                         if name == ".." {
678                                 node = node.Parent()
679                                 continue
680                         }
681                 }
682                 node, err = func() (inode, error) {
683                         node.RLock()
684                         defer node.RUnlock()
685                         return node.Child(name, nil)
686                 }()
687                 if node == nil || err != nil {
688                         break
689                 }
690         }
691         if node == nil && err == nil {
692                 err = os.ErrNotExist
693         }
694         return
695 }
696
697 func permittedName(name string) bool {
698         return name != "" && name != "." && name != ".." && !strings.Contains(name, "/")
699 }