3644: Working on adding timestamps
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7
8 import llfuse
9 import errno
10 import stat
11 import threading
12 import arvados
13 import pprint
14 import arvados.events
15 import re
16 import apiclient
17 import json
18 import logging
19
20 _logger = logging.getLogger('arvados.arvados_fuse')
21
22 from time import time
23 from llfuse import FUSEError
24
25 class FreshBase(object):
26     '''Base class for maintaining fresh/stale state to determine when to update.'''
27     def __init__(self):
28         self._stale = True
29         self._poll = False
30         self._last_update = time()
31         self._poll_time = 60
32
33     # Mark the value as stale
34     def invalidate(self):
35         self._stale = True
36
37     # Test if the entries dict is stale
38     def stale(self):
39         if self._stale:
40             return True
41         if self._poll:
42             return (self._last_update + self._poll_time) < time()
43         return False
44
45     def fresh(self):
46         self._stale = False
47         self._last_update = time()
48
49     def ctime():
50         return 0
51
52     def mtime():
53         return 0
54
55
56 class File(FreshBase):
57     '''Base for file objects.'''
58
59     def __init__(self, parent_inode):
60         super(File, self).__init__()
61         self.inode = None
62         self.parent_inode = parent_inode
63
64     def size(self):
65         return 0
66
67     def readfrom(self, off, size):
68         return ''
69
70
71 class StreamReaderFile(File):
72     '''Wraps a StreamFileReader as a file.'''
73
74     def __init__(self, parent_inode, reader, collection):
75         super(StreamReaderFile, self).__init__(parent_inode)
76         self.reader = reader
77         self.collection = collection
78
79     def size(self):
80         return self.reader.size()
81
82     def readfrom(self, off, size):
83         return self.reader.readfrom(off, size)
84
85     def stale(self):
86         return False
87
88     def ctime():
89         return collection["created_at"]
90
91     def mtime():
92         return collection["modified_at"]
93
94
95 class ObjectFile(File):
96     '''Wraps a dict as a serialized json object.'''
97
98     def __init__(self, parent_inode, contents):
99         super(ObjectFile, self).__init__(parent_inode)
100         self.contentsdict = contents
101         self.uuid = self.contentsdict['uuid']
102         self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
103
104     def size(self):
105         return len(self.contents)
106
107     def readfrom(self, off, size):
108         return self.contents[off:(off+size)]
109
110
111 class Directory(FreshBase):
112     '''Generic directory object, backed by a dict.
113     Consists of a set of entries with the key representing the filename
114     and the value referencing a File or Directory object.
115     '''
116
117     def __init__(self, parent_inode):
118         super(Directory, self).__init__()
119
120         '''parent_inode is the integer inode number'''
121         self.inode = None
122         if not isinstance(parent_inode, int):
123             raise Exception("parent_inode should be an int")
124         self.parent_inode = parent_inode
125         self._entries = {}
126
127     #  Overriden by subclasses to implement logic to update the entries dict
128     #  when the directory is stale
129     def update(self):
130         pass
131
132     # Only used when computing the size of the disk footprint of the directory
133     # (stub)
134     def size(self):
135         return 0
136
137     def checkupdate(self):
138         if self.stale():
139             try:
140                 self.update()
141             except apiclient.errors.HttpError as e:
142                 _logger.debug(e)
143
144     def __getitem__(self, item):
145         self.checkupdate()
146         return self._entries[item]
147
148     def items(self):
149         self.checkupdate()
150         return self._entries.items()
151
152     def __iter__(self):
153         self.checkupdate()
154         return self._entries.iterkeys()
155
156     def __contains__(self, k):
157         self.checkupdate()
158         return k in self._entries
159
160     def merge(self, items, fn, same, new_entry):
161         '''Helper method for updating the contents of the directory.
162
163         items: array with new directory contents
164
165         fn: function to take an entry in 'items' and return the desired file or
166         directory name
167
168         same: function to compare an existing entry with an entry in the items
169         list to determine whether to keep the existing entry.
170
171         new_entry: function to create a new directory entry from array entry.
172         '''
173
174         oldentries = self._entries
175         self._entries = {}
176         for i in items:
177             n = fn(i)
178             if n in oldentries and same(oldentries[n], i):
179                 self._entries[n] = oldentries[n]
180                 del oldentries[n]
181             else:
182                 ent = new_entry(i)
183                 if ent is not None:
184                     self._entries[n] = self.inodes.add_entry(ent)
185         for n in oldentries:
186             llfuse.invalidate_entry(self.inode, str(n))
187             self.inodes.del_entry(oldentries[n])
188         self.fresh()
189
190
191 class CollectionDirectory(Directory):
192     '''Represents the root of a directory tree holding a collection.'''
193
194     def __init__(self, parent_inode, inodes, collection_locator):
195         super(CollectionDirectory, self).__init__(parent_inode)
196         self.inodes = inodes
197         self.collection_locator = collection_locator
198
199     def same(self, i):
200         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
201
202     def update(self):
203         try:
204             collection = arvados.CollectionReader(self.collection_locator)
205             for s in collection.all_streams():
206                 cwd = self
207                 for part in s.name().split('/'):
208                     if part != '' and part != '.':
209                         if part not in cwd._entries:
210                             cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
211                         cwd = cwd._entries[part]
212                 for k, v in s.files().items():
213                     cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
214             self.fresh()
215             return True
216         except Exception as detail:
217             _logger.debug("arv-mount %s: error: %s",
218                           self.collection_locator, detail)
219             return False
220
221 class MagicDirectory(Directory):
222     '''A special directory that logically contains the set of all extant keep
223     locators.  When a file is referenced by lookup(), it is tested to see if it
224     is a valid keep locator to a manifest, and if so, loads the manifest
225     contents as a subdirectory of this directory with the locator as the
226     directory name.  Since querying a list of all extant keep locators is
227     impractical, only collections that have already been accessed are visible
228     to readdir().
229     '''
230
231     def __init__(self, parent_inode, inodes):
232         super(MagicDirectory, self).__init__(parent_inode)
233         self.inodes = inodes
234
235     def __contains__(self, k):
236         if k in self._entries:
237             return True
238         try:
239             e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, k))
240             if e.update():
241                 self._entries[k] = e
242                 return True
243             else:
244                 return False
245         except Exception as e:
246             _logger.debug('arv-mount exception keep %s', e)
247             return False
248
249     def __getitem__(self, item):
250         if item in self:
251             return self._entries[item]
252         else:
253             raise KeyError("No collection with id " + item)
254
255 class RecursiveInvalidateDirectory(Directory):
256     def invalidate(self):
257         try:
258             if self.parent_inode == llfuse.ROOT_INODE:
259                 llfuse.lock.acquire()
260             super(RecursiveInvalidateDirectory, self).invalidate()
261             for a in self._entries:
262                 self._entries[a].invalidate()
263         finally:
264             if self.parent_inode == llfuse.ROOT_INODE:
265                 llfuse.lock.release()
266
267 class TagsDirectory(RecursiveInvalidateDirectory):
268     '''A special directory that contains as subdirectories all tags visible to the user.'''
269
270     def __init__(self, parent_inode, inodes, api, poll_time=60):
271         super(TagsDirectory, self).__init__(parent_inode)
272         self.inodes = inodes
273         self.api = api
274         try:
275             arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
276         except:
277             self._poll = True
278             self._poll_time = poll_time
279
280     def update(self):
281         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
282         if "items" in tags:
283             self.merge(tags['items'],
284                        lambda i: i['name'] if 'name' in i else i['uuid'],
285                        lambda a, i: a.tag == i,
286                        lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
287
288 class TagDirectory(Directory):
289     '''A special directory that contains as subdirectories all collections visible
290     to the user that are tagged with a particular tag.
291     '''
292
293     def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
294         super(TagDirectory, self).__init__(parent_inode)
295         self.inodes = inodes
296         self.api = api
297         self.tag = tag
298         self._poll = poll
299         self._poll_time = poll_time
300
301     def update(self):
302         taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
303                                                ['name', '=', self.tag],
304                                                ['head_uuid', 'is_a', 'arvados#collection']],
305                                       select=['head_uuid']).execute()
306         self.merge(taggedcollections['items'],
307                    lambda i: i['head_uuid'],
308                    lambda a, i: a.collection_locator == i['head_uuid'],
309                    lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
310
311
312 class ProjectDirectory(RecursiveInvalidateDirectory):
313     '''A special directory that contains the contents of a project.'''
314
315     def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
316         super(ProjectDirectory, self).__init__(parent_inode)
317         self.inodes = inodes
318         self.api = api
319         self.uuid = uuid['uuid']
320
321         if parent_inode == llfuse.ROOT_INODE:
322             try:
323                 arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
324             except:
325                 self._poll = True
326                 self._poll_time = poll_time
327         else:
328             self._poll = poll
329             self._poll_time = poll_time
330
331
332     def createDirectory(self, i):
333         if re.match(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}', i['uuid']) and i['name'] is not None:
334             return CollectionDirectory(self.inode, self.inodes, i['uuid'])
335         elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
336             return ProjectDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
337         #elif re.match(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}', i['uuid']):
338         #    return None
339         #elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
340         #    return ObjectFile(self.parent_inode, i)
341         else:
342             return None
343
344     def contents(self):
345         return arvados.util.all_contents(self.api, self.uuid)
346
347     def update(self):
348         def same(a, i):
349             if isinstance(a, CollectionDirectory):
350                 return a.collection_locator == i['uuid']
351             elif isinstance(a, ProjectDirectory):
352                 return a.uuid == i['uuid']
353             elif isinstance(a, ObjectFile):
354                 return a.uuid == i['uuid'] and not a.stale()
355             return False
356
357         self.merge(self.contents(),
358                    lambda i: i['name'] if 'name' in i and i['name'] is not None and len(i['name']) > 0 else i['uuid'],
359                    same,
360                    self.createDirectory)
361
362
363 class HomeDirectory(ProjectDirectory):
364     '''A special directory that represents the "home" project.'''
365
366     def __init__(self, parent_inode, inodes, api, poll=False, poll_time=60):
367         super(HomeDirectory, self).__init__(parent_inode, inodes, api, api.users().current().execute())
368
369     #def contents(self):
370     #    return self.api.groups().contents(uuid=self.uuid).execute()['items']
371
372 class FileHandle(object):
373     '''Connects a numeric file handle to a File or Directory object that has
374     been opened by the client.'''
375
376     def __init__(self, fh, entry):
377         self.fh = fh
378         self.entry = entry
379
380
381 class Inodes(object):
382     '''Manage the set of inodes.  This is the mapping from a numeric id
383     to a concrete File or Directory object'''
384
385     def __init__(self):
386         self._entries = {}
387         self._counter = llfuse.ROOT_INODE
388
389     def __getitem__(self, item):
390         return self._entries[item]
391
392     def __setitem__(self, key, item):
393         self._entries[key] = item
394
395     def __iter__(self):
396         return self._entries.iterkeys()
397
398     def items(self):
399         return self._entries.items()
400
401     def __contains__(self, k):
402         return k in self._entries
403
404     def add_entry(self, entry):
405         entry.inode = self._counter
406         self._entries[entry.inode] = entry
407         self._counter += 1
408         return entry
409
410     def del_entry(self, entry):
411         llfuse.invalidate_inode(entry.inode)
412         del self._entries[entry.inode]
413
414 class Operations(llfuse.Operations):
415     '''This is the main interface with llfuse.  The methods on this object are
416     called by llfuse threads to service FUSE events to query and read from
417     the file system.
418
419     llfuse has its own global lock which is acquired before calling a request handler,
420     so request handlers do not run concurrently unless the lock is explicitly released
421     with llfuse.lock_released.'''
422
423     def __init__(self, uid, gid):
424         super(Operations, self).__init__()
425
426         self.inodes = Inodes()
427         self.uid = uid
428         self.gid = gid
429
430         # dict of inode to filehandle
431         self._filehandles = {}
432         self._filehandles_counter = 1
433
434         # Other threads that need to wait until the fuse driver
435         # is fully initialized should wait() on this event object.
436         self.initlock = threading.Event()
437
438     def init(self):
439         # Allow threads that are waiting for the driver to be finished
440         # initializing to continue
441         self.initlock.set()
442
443     def access(self, inode, mode, ctx):
444         return True
445
446     def getattr(self, inode):
447         if inode not in self.inodes:
448             raise llfuse.FUSEError(errno.ENOENT)
449
450         e = self.inodes[inode]
451
452         entry = llfuse.EntryAttributes()
453         entry.st_ino = inode
454         entry.generation = 0
455         entry.entry_timeout = 300
456         entry.attr_timeout = 300
457
458         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
459         if isinstance(e, Directory):
460             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
461         else:
462             entry.st_mode |= stat.S_IFREG
463
464         entry.st_nlink = 1
465         entry.st_uid = self.uid
466         entry.st_gid = self.gid
467         entry.st_rdev = 0
468
469         entry.st_size = e.size()
470
471         entry.st_blksize = 1024
472         entry.st_blocks = e.size()/1024
473         if e.size()/1024 != 0:
474             entry.st_blocks += 1
475         entry.st_atime = 0
476         entry.st_mtime = 0
477         entry.st_ctime = 0
478
479         return entry
480
481     def lookup(self, parent_inode, name):
482         _logger.debug("arv-mount lookup: parent_inode %i name %s",
483                       parent_inode, name)
484         inode = None
485
486         if name == '.':
487             inode = parent_inode
488         else:
489             if parent_inode in self.inodes:
490                 p = self.inodes[parent_inode]
491                 if name == '..':
492                     inode = p.parent_inode
493                 elif name in p:
494                     inode = p[name].inode
495
496         if inode != None:
497             return self.getattr(inode)
498         else:
499             raise llfuse.FUSEError(errno.ENOENT)
500
501     def open(self, inode, flags):
502         if inode in self.inodes:
503             p = self.inodes[inode]
504         else:
505             raise llfuse.FUSEError(errno.ENOENT)
506
507         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
508             raise llfuse.FUSEError(errno.EROFS)
509
510         if isinstance(p, Directory):
511             raise llfuse.FUSEError(errno.EISDIR)
512
513         fh = self._filehandles_counter
514         self._filehandles_counter += 1
515         self._filehandles[fh] = FileHandle(fh, p)
516         return fh
517
518     def read(self, fh, off, size):
519         _logger.debug("arv-mount read %i %i %i", fh, off, size)
520         if fh in self._filehandles:
521             handle = self._filehandles[fh]
522         else:
523             raise llfuse.FUSEError(errno.EBADF)
524
525         try:
526             with llfuse.lock_released:
527                 return handle.entry.readfrom(off, size)
528         except:
529             raise llfuse.FUSEError(errno.EIO)
530
531     def release(self, fh):
532         if fh in self._filehandles:
533             del self._filehandles[fh]
534
535     def opendir(self, inode):
536         _logger.debug("arv-mount opendir: inode %i", inode)
537
538         if inode in self.inodes:
539             p = self.inodes[inode]
540         else:
541             raise llfuse.FUSEError(errno.ENOENT)
542
543         if not isinstance(p, Directory):
544             raise llfuse.FUSEError(errno.ENOTDIR)
545
546         fh = self._filehandles_counter
547         self._filehandles_counter += 1
548         if p.parent_inode in self.inodes:
549             parent = self.inodes[p.parent_inode]
550         else:
551             raise llfuse.FUSEError(errno.EIO)
552
553         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
554         return fh
555
556     def readdir(self, fh, off):
557         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
558
559         if fh in self._filehandles:
560             handle = self._filehandles[fh]
561         else:
562             raise llfuse.FUSEError(errno.EBADF)
563
564         _logger.debug("arv-mount handle.entry %s", handle.entry)
565
566         e = off
567         while e < len(handle.entry):
568             if handle.entry[e][1].inode in self.inodes:
569                 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
570             e += 1
571
572     def releasedir(self, fh):
573         del self._filehandles[fh]
574
575     def statfs(self):
576         st = llfuse.StatvfsData()
577         st.f_bsize = 1024 * 1024
578         st.f_blocks = 0
579         st.f_files = 0
580
581         st.f_bfree = 0
582         st.f_bavail = 0
583
584         st.f_ffree = 0
585         st.f_favail = 0
586
587         st.f_frsize = 0
588         return st
589
590     # The llfuse documentation recommends only overloading functions that
591     # are actually implemented, as the default implementation will raise ENOSYS.
592     # However, there is a bug in the llfuse default implementation of create()
593     # "create() takes exactly 5 positional arguments (6 given)" which will crash
594     # arv-mount.
595     # The workaround is to implement it with the proper number of parameters,
596     # and then everything works out.
597     def create(self, p1, p2, p3, p4, p5):
598         raise llfuse.FUSEError(errno.EROFS)