Merge branch 'master' into 2060-edit-tags-in-workbench
[arvados.git] / sdk / python / arvados / fuse.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7
8 import llfuse
9 import errno
10 import stat
11 import threading
12 import arvados
13 import argparse
14 import pprint
15
16 from time import time
17 from llfuse import FUSEError
18
19 class Directory(object):
20     '''Generic directory object, backed by a dict.
21     Consists of a set of entries with the key representing the filename
22     and the value referencing a File or Directory object.
23     '''
24
25     def __init__(self, parent_inode):
26         self.inode = None
27         self.parent_inode = parent_inode
28         self._entries = {}
29
30     def __getitem__(self, item):
31         return self._entries[item]
32
33     def __setitem__(self, key, item):
34         self._entries[key] = item
35
36     def __iter__(self):
37         return self._entries.iterkeys()
38
39     def items(self):
40         return self._entries.items()
41
42     def __contains__(self, k):
43         return k in self._entries
44
45     def size(self):
46         return 0
47
48 class MagicDirectory(Directory):
49     '''A special directory that logically contains the set of all extant
50     keep locators.  When a file is referenced by lookup(), it is tested
51     to see if it is a valid keep locator to a manifest, and if so, loads the manifest
52     contents as a subdirectory of this directory with the locator as the directory name.
53     Since querying a list of all extant keep locators is impractical, only loaded collections 
54     are visible to readdir().'''
55
56     def __init__(self, parent_inode, inodes):
57         super(MagicDirectory, self).__init__(parent_inode)
58         self.inodes = inodes
59
60     def __contains__(self, k):
61         if k in self._entries:
62             return True
63         try:
64             if arvados.Keep.get(k):
65                 return True
66             else:
67                 return False
68         except Exception as e:
69             #print 'exception keep', e
70             return False
71
72     def __getitem__(self, item):
73         if item not in self._entries:
74             collection = arvados.CollectionReader(arvados.Keep.get(item))
75             self._entries[item] = self.inodes.add_entry(Directory(self.inode))
76             self.inodes.load_collection(self._entries[item], collection)
77         return self._entries[item]
78
79 class File(object):
80     '''Wraps a StreamFileReader for use by Directory.'''
81
82     def __init__(self, parent_inode, reader):
83         self.inode = None
84         self.parent_inode = parent_inode
85         self.reader = reader
86
87     def size(self):
88         return self.reader.size()
89
90 class FileHandle(object):
91     '''Connects a numeric file handle to a File or Directory object that has 
92     been opened by the client.'''
93
94     def __init__(self, fh, entry):
95         self.fh = fh
96         self.entry = entry
97
98 class Inodes(object):
99     '''Manage the set of inodes.  This is the mapping from a numeric id 
100     to a concrete File or Directory object'''
101
102     def __init__(self):
103         self._entries = {}
104         self._counter = llfuse.ROOT_INODE
105
106     def __getitem__(self, item):
107         return self._entries[item]
108
109     def __setitem__(self, key, item):
110         self._entries[key] = item
111
112     def __iter__(self):
113         return self._entries.iterkeys()
114
115     def items(self):
116         return self._entries.items()
117
118     def __contains__(self, k):
119         return k in self._entries
120
121     def load_collection(self, parent_dir, collection):
122         '''parent_dir is the Directory object that will be populated by the collection.
123         collection is the arvados.CollectionReader to use as the source'''
124         for s in collection.all_streams():
125             cwd = parent_dir
126             for part in s.name().split('/'):
127                 if part != '' and part != '.':
128                     if part not in cwd:
129                         cwd[part] = self.add_entry(Directory(cwd.inode))
130                     cwd = cwd[part]
131             for k, v in s.files().items():
132                 cwd[k] = self.add_entry(File(cwd.inode, v))
133
134     def add_entry(self, entry):
135         entry.inode = self._counter
136         self._entries[entry.inode] = entry
137         self._counter += 1
138         return entry    
139
140 class Operations(llfuse.Operations):
141     '''This is the main interface with llfuse.  The methods on this object are
142     called by llfuse threads to service FUSE events to query and read from 
143     the file system.
144
145     llfuse has its own global lock which is acquired before calling a request handler,
146     so request handlers do not run concurrently unless the lock is explicitly released 
147     with llfuse.lock_released.'''
148
149     def __init__(self, uid, gid):
150         super(Operations, self).__init__()
151
152         self.inodes = Inodes()
153         self.uid = uid
154         self.gid = gid
155         
156         # dict of inode to filehandle
157         self._filehandles = {}
158         self._filehandles_counter = 1
159
160         # Other threads that need to wait until the fuse driver
161         # is fully initialized should wait() on this event object.
162         self.initlock = threading.Event()
163
164     def init(self):
165         # Allow threads that are waiting for the driver to be finished
166         # initializing to continue
167         self.initlock.set()
168
169     def access(self, inode, mode, ctx):
170         return True
171    
172     def getattr(self, inode):
173         e = self.inodes[inode]
174
175         entry = llfuse.EntryAttributes()
176         entry.st_ino = inode
177         entry.generation = 0
178         entry.entry_timeout = 300
179         entry.attr_timeout = 300
180
181         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
182         if isinstance(e, Directory):
183             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
184         else:
185             entry.st_mode |= stat.S_IFREG
186
187         entry.st_nlink = 1
188         entry.st_uid = self.uid
189         entry.st_gid = self.gid
190         entry.st_rdev = 0
191
192         entry.st_size = e.size()
193
194         entry.st_blksize = 1024
195         entry.st_blocks = e.size()/1024
196         if e.size()/1024 != 0:
197             entry.st_blocks += 1
198         entry.st_atime = 0
199         entry.st_mtime = 0
200         entry.st_ctime = 0
201
202         return entry
203
204     def lookup(self, parent_inode, name):
205         #print "lookup: parent_inode", parent_inode, "name", name
206         inode = None
207
208         if name == '.':
209             inode = parent_inode
210         else:
211             if parent_inode in self.inodes:
212                 p = self.inodes[parent_inode]
213                 if name == '..':
214                     inode = p.parent_inode
215                 elif name in p:
216                     inode = p[name].inode
217
218         if inode != None:
219             return self.getattr(inode)
220         else:
221             raise llfuse.FUSEError(errno.ENOENT)
222    
223     def open(self, inode, flags):
224         if inode in self.inodes:
225             p = self.inodes[inode]
226         else:
227             raise llfuse.FUSEError(errno.ENOENT)
228
229         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
230             raise llfuse.FUSEError(errno.EROFS)
231
232         if isinstance(p, Directory):
233             raise llfuse.FUSEError(errno.EISDIR)
234
235         fh = self._filehandles_counter
236         self._filehandles_counter += 1
237         self._filehandles[fh] = FileHandle(fh, p)
238         return fh
239
240     def read(self, fh, off, size):
241         #print "read", fh, off, size
242         if fh in self._filehandles:
243             handle = self._filehandles[fh]
244         else:
245             raise llfuse.FUSEError(errno.EBADF)
246
247         try:
248             with llfuse.lock_released:
249                 return handle.entry.reader.readfrom(off, size)
250         except:
251             raise llfuse.FUSEError(errno.EIO)
252
253     def release(self, fh):
254         if fh in self._filehandles:
255             del self._filehandles[fh]
256
257     def opendir(self, inode):
258         #print "opendir: inode", inode
259
260         if inode in self.inodes:
261             p = self.inodes[inode]
262         else:
263             raise llfuse.FUSEError(errno.ENOENT)
264
265         if not isinstance(p, Directory):
266             raise llfuse.FUSEError(errno.ENOTDIR)
267
268         fh = self._filehandles_counter
269         self._filehandles_counter += 1
270         if p.parent_inode in self.inodes:
271             parent = self.inodes[p.parent_inode]
272         else:
273             parent = None
274         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
275         return fh
276
277     def readdir(self, fh, off):
278         #print "readdir: fh", fh, "off", off
279
280         if fh in self._filehandles:
281             handle = self._filehandles[fh]
282         else:
283             raise llfuse.FUSEError(errno.EBADF)
284
285         #print "handle.entry", handle.entry
286
287         e = off
288         while e < len(handle.entry):
289             yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
290             e += 1
291
292     def releasedir(self, fh):
293         del self._filehandles[fh]
294
295     def statfs(self):
296         st = llfuse.StatvfsData()
297         st.f_bsize = 1024 * 1024
298         st.f_blocks = 0
299         st.f_files = 0
300
301         st.f_bfree = 0
302         st.f_bavail = 0
303
304         st.f_ffree = 0
305         st.f_favail = 0
306
307         st.f_frsize = 0
308         return st
309
310     # The llfuse documentation recommends only overloading functions that
311     # are actually implemented, as the default implementation will raise ENOSYS.
312     # However, there is a bug in the llfuse default implementation of create()
313     # "create() takes exactly 5 positional arguments (6 given)" which will crash
314     # arv-mount.
315     # The workaround is to implement it with the proper number of parameters,
316     # and then everything works out.
317     def create(self, p1, p2, p3, p4, p5):
318         raise llfuse.FUSEError(errno.EROFS)