20602: Report queue time for pos/neg/zero-priority reqs.
[arvados.git] / lib / mount / fs.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package mount
6
7 import (
8         "errors"
9         "io"
10         "log"
11         "os"
12         "runtime/debug"
13         "sync"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "git.arvados.org/arvados.git/sdk/go/keepclient"
17         "github.com/arvados/cgofuse/fuse"
18 )
19
20 // sharedFile wraps arvados.File with a sync.Mutex, so fuse can safely
21 // use a single filehandle concurrently on behalf of multiple
22 // threads/processes.
23 type sharedFile struct {
24         arvados.File
25         sync.Mutex
26 }
27
28 // keepFS implements cgofuse's FileSystemInterface.
29 type keepFS struct {
30         fuse.FileSystemBase
31         Client     *arvados.Client
32         KeepClient *keepclient.KeepClient
33         ReadOnly   bool
34         Uid        int
35         Gid        int
36
37         root   arvados.CustomFileSystem
38         open   map[uint64]*sharedFile
39         lastFH uint64
40         sync.RWMutex
41
42         // If non-nil, this channel will be closed by Init() to notify
43         // other goroutines that the mount is ready.
44         ready chan struct{}
45 }
46
47 var (
48         invalidFH = ^uint64(0)
49 )
50
51 // newFH wraps f in a sharedFile, adds it to fs's lookup table using a
52 // new handle number, and returns the handle number.
53 func (fs *keepFS) newFH(f arvados.File) uint64 {
54         fs.Lock()
55         defer fs.Unlock()
56         if fs.open == nil {
57                 fs.open = make(map[uint64]*sharedFile)
58         }
59         fs.lastFH++
60         fh := fs.lastFH
61         fs.open[fh] = &sharedFile{File: f}
62         return fh
63 }
64
65 func (fs *keepFS) lookupFH(fh uint64) *sharedFile {
66         fs.RLock()
67         defer fs.RUnlock()
68         return fs.open[fh]
69 }
70
71 func (fs *keepFS) Init() {
72         defer fs.debugPanics()
73         fs.root = fs.Client.SiteFileSystem(fs.KeepClient)
74         fs.root.MountProject("home", "")
75         if fs.ready != nil {
76                 close(fs.ready)
77         }
78 }
79
80 func (fs *keepFS) Create(path string, flags int, mode uint32) (errc int, fh uint64) {
81         defer fs.debugPanics()
82         if fs.ReadOnly {
83                 return -fuse.EROFS, invalidFH
84         }
85         f, err := fs.root.OpenFile(path, flags|os.O_CREATE, os.FileMode(mode))
86         if err == os.ErrExist {
87                 return -fuse.EEXIST, invalidFH
88         } else if err != nil {
89                 return -fuse.EINVAL, invalidFH
90         }
91         return 0, fs.newFH(f)
92 }
93
94 func (fs *keepFS) Open(path string, flags int) (errc int, fh uint64) {
95         defer fs.debugPanics()
96         if fs.ReadOnly && flags&(os.O_RDWR|os.O_WRONLY|os.O_CREATE) != 0 {
97                 return -fuse.EROFS, invalidFH
98         }
99         f, err := fs.root.OpenFile(path, flags, 0)
100         if err != nil {
101                 return -fuse.ENOENT, invalidFH
102         } else if fi, err := f.Stat(); err != nil {
103                 return -fuse.EIO, invalidFH
104         } else if fi.IsDir() {
105                 f.Close()
106                 return -fuse.EISDIR, invalidFH
107         }
108         return 0, fs.newFH(f)
109 }
110
111 func (fs *keepFS) Utimens(path string, tmsp []fuse.Timespec) int {
112         defer fs.debugPanics()
113         if fs.ReadOnly {
114                 return -fuse.EROFS
115         }
116         f, err := fs.root.OpenFile(path, 0, 0)
117         if err != nil {
118                 return fs.errCode(err)
119         }
120         f.Close()
121         return 0
122 }
123
124 func (fs *keepFS) errCode(err error) int {
125         if err == nil {
126                 return 0
127         }
128         if errors.Is(err, os.ErrNotExist) {
129                 return -fuse.ENOENT
130         }
131         if errors.Is(err, os.ErrExist) {
132                 return -fuse.EEXIST
133         }
134         if errors.Is(err, arvados.ErrInvalidArgument) {
135                 return -fuse.EINVAL
136         }
137         if errors.Is(err, arvados.ErrInvalidOperation) {
138                 return -fuse.ENOSYS
139         }
140         if errors.Is(err, arvados.ErrDirectoryNotEmpty) {
141                 return -fuse.ENOTEMPTY
142         }
143         return -fuse.EIO
144 }
145
146 func (fs *keepFS) Mkdir(path string, mode uint32) int {
147         defer fs.debugPanics()
148         if fs.ReadOnly {
149                 return -fuse.EROFS
150         }
151         f, err := fs.root.OpenFile(path, os.O_CREATE|os.O_EXCL, os.FileMode(mode)|os.ModeDir)
152         if err != nil {
153                 return fs.errCode(err)
154         }
155         f.Close()
156         return 0
157 }
158
159 func (fs *keepFS) Opendir(path string) (errc int, fh uint64) {
160         defer fs.debugPanics()
161         f, err := fs.root.OpenFile(path, 0, 0)
162         if err != nil {
163                 return fs.errCode(err), invalidFH
164         } else if fi, err := f.Stat(); err != nil {
165                 return fs.errCode(err), invalidFH
166         } else if !fi.IsDir() {
167                 f.Close()
168                 return -fuse.ENOTDIR, invalidFH
169         }
170         return 0, fs.newFH(f)
171 }
172
173 func (fs *keepFS) Releasedir(path string, fh uint64) (errc int) {
174         defer fs.debugPanics()
175         return fs.Release(path, fh)
176 }
177
178 func (fs *keepFS) Rmdir(path string) int {
179         defer fs.debugPanics()
180         return fs.errCode(fs.root.Remove(path))
181 }
182
183 func (fs *keepFS) Release(path string, fh uint64) (errc int) {
184         defer fs.debugPanics()
185         fs.Lock()
186         defer fs.Unlock()
187         defer delete(fs.open, fh)
188         if f := fs.open[fh]; f != nil {
189                 err := f.Close()
190                 if err != nil {
191                         return -fuse.EIO
192                 }
193         }
194         return 0
195 }
196
197 func (fs *keepFS) Rename(oldname, newname string) (errc int) {
198         defer fs.debugPanics()
199         if fs.ReadOnly {
200                 return -fuse.EROFS
201         }
202         return fs.errCode(fs.root.Rename(oldname, newname))
203 }
204
205 func (fs *keepFS) Unlink(path string) (errc int) {
206         defer fs.debugPanics()
207         if fs.ReadOnly {
208                 return -fuse.EROFS
209         }
210         return fs.errCode(fs.root.Remove(path))
211 }
212
213 func (fs *keepFS) Truncate(path string, size int64, fh uint64) (errc int) {
214         defer fs.debugPanics()
215         if fs.ReadOnly {
216                 return -fuse.EROFS
217         }
218
219         // Sometimes fh is a valid filehandle and we don't need to
220         // waste a name lookup.
221         if f := fs.lookupFH(fh); f != nil {
222                 return fs.errCode(f.Truncate(size))
223         }
224
225         // Other times, fh is invalid and we need to lookup path.
226         f, err := fs.root.OpenFile(path, os.O_RDWR, 0)
227         if err != nil {
228                 return fs.errCode(err)
229         }
230         defer f.Close()
231         return fs.errCode(f.Truncate(size))
232 }
233
234 func (fs *keepFS) Getattr(path string, stat *fuse.Stat_t, fh uint64) (errc int) {
235         defer fs.debugPanics()
236         var fi os.FileInfo
237         var err error
238         if f := fs.lookupFH(fh); f != nil {
239                 // Valid filehandle -- ignore path.
240                 fi, err = f.Stat()
241         } else {
242                 // Invalid filehandle -- lookup path.
243                 fi, err = fs.root.Stat(path)
244         }
245         if err != nil {
246                 return fs.errCode(err)
247         }
248         fs.fillStat(stat, fi)
249         return 0
250 }
251
252 func (fs *keepFS) Chmod(path string, mode uint32) (errc int) {
253         if fs.ReadOnly {
254                 return -fuse.EROFS
255         }
256         if fi, err := fs.root.Stat(path); err != nil {
257                 return fs.errCode(err)
258         } else if mode & ^uint32(fuse.S_IFREG|fuse.S_IFDIR|0777) != 0 {
259                 // Refuse to set mode bits other than
260                 // regfile/dir/perms
261                 return -fuse.ENOSYS
262         } else if (fi.Mode()&os.ModeDir != 0) != (mode&fuse.S_IFDIR != 0) {
263                 // Refuse to transform a regular file to a dir, or
264                 // vice versa
265                 return -fuse.ENOSYS
266         }
267         // As long as the change isn't nonsense, chmod is a no-op,
268         // because we don't save permission bits.
269         return 0
270 }
271
272 func (fs *keepFS) fillStat(stat *fuse.Stat_t, fi os.FileInfo) {
273         defer fs.debugPanics()
274         var m uint32
275         if fi.IsDir() {
276                 m = m | fuse.S_IFDIR
277         } else {
278                 m = m | fuse.S_IFREG
279         }
280         m = m | uint32(fi.Mode()&os.ModePerm)
281         stat.Mode = m
282         stat.Nlink = 1
283         stat.Size = fi.Size()
284         t := fuse.NewTimespec(fi.ModTime())
285         stat.Mtim = t
286         stat.Ctim = t
287         stat.Atim = t
288         stat.Birthtim = t
289         stat.Blksize = 1024
290         stat.Blocks = (stat.Size + stat.Blksize - 1) / stat.Blksize
291         if fs.Uid > 0 && int64(fs.Uid) < 1<<31 {
292                 stat.Uid = uint32(fs.Uid)
293         }
294         if fs.Gid > 0 && int64(fs.Gid) < 1<<31 {
295                 stat.Gid = uint32(fs.Gid)
296         }
297 }
298
299 func (fs *keepFS) Write(path string, buf []byte, ofst int64, fh uint64) (n int) {
300         defer fs.debugPanics()
301         if fs.ReadOnly {
302                 return -fuse.EROFS
303         }
304         f := fs.lookupFH(fh)
305         if f == nil {
306                 return -fuse.EBADF
307         }
308         f.Lock()
309         defer f.Unlock()
310         if _, err := f.Seek(ofst, io.SeekStart); err != nil {
311                 return fs.errCode(err)
312         }
313         n, err := f.Write(buf)
314         if err != nil {
315                 log.Printf("error writing %q: %s", path, err)
316                 return fs.errCode(err)
317         }
318         return n
319 }
320
321 func (fs *keepFS) Read(path string, buf []byte, ofst int64, fh uint64) (n int) {
322         defer fs.debugPanics()
323         f := fs.lookupFH(fh)
324         if f == nil {
325                 return -fuse.EBADF
326         }
327         f.Lock()
328         defer f.Unlock()
329         if _, err := f.Seek(ofst, io.SeekStart); err != nil {
330                 return fs.errCode(err)
331         }
332         n, err := f.Read(buf)
333         for err == nil && n < len(buf) {
334                 // f is an io.Reader ("If some data is available but
335                 // not len(p) bytes, Read conventionally returns what
336                 // is available instead of waiting for more") -- but
337                 // our caller requires us to either fill buf or reach
338                 // EOF.
339                 done := n
340                 n, err = f.Read(buf[done:])
341                 n += done
342         }
343         if err != nil && err != io.EOF {
344                 log.Printf("error reading %q: %s", path, err)
345                 return fs.errCode(err)
346         }
347         return n
348 }
349
350 func (fs *keepFS) Readdir(path string,
351         fill func(name string, stat *fuse.Stat_t, ofst int64) bool,
352         ofst int64,
353         fh uint64) (errc int) {
354         defer fs.debugPanics()
355         f := fs.lookupFH(fh)
356         if f == nil {
357                 return -fuse.EBADF
358         }
359         fill(".", nil, 0)
360         fill("..", nil, 0)
361         var stat fuse.Stat_t
362         fis, err := f.Readdir(-1)
363         if err != nil {
364                 return fs.errCode(err)
365         }
366         for _, fi := range fis {
367                 fs.fillStat(&stat, fi)
368                 fill(fi.Name(), &stat, 0)
369         }
370         return 0
371 }
372
373 func (fs *keepFS) Fsync(path string, datasync bool, fh uint64) int {
374         defer fs.debugPanics()
375         f := fs.lookupFH(fh)
376         if f == nil {
377                 return -fuse.EBADF
378         }
379         return fs.errCode(f.Sync())
380 }
381
382 func (fs *keepFS) Fsyncdir(path string, datasync bool, fh uint64) int {
383         return fs.Fsync(path, datasync, fh)
384 }
385
386 // debugPanics (when deferred by keepFS handlers) prints an error and
387 // stack trace on stderr when a handler crashes. (Without this,
388 // cgofuse recovers from panics silently and returns EIO.)
389 func (fs *keepFS) debugPanics() {
390         if err := recover(); err != nil {
391                 log.Printf("(%T) %v", err, err)
392                 debug.PrintStack()
393                 panic(err)
394         }
395 }