Merge branch 'master' into 2221-complete-docker
[arvados.git] / sdk / python / arvados / fuse.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7
8 import llfuse
9 import errno
10 import stat
11 import threading
12 import arvados
13 import pprint
14
15 from time import time
16 from llfuse import FUSEError
17
18 class Directory(object):
19     '''Generic directory object, backed by a dict.
20     Consists of a set of entries with the key representing the filename
21     and the value referencing a File or Directory object.
22     '''
23
24     def __init__(self, parent_inode):
25         self.inode = None
26         self.parent_inode = parent_inode
27         self._entries = {}
28
29     def __getitem__(self, item):
30         return self._entries[item]
31
32     def __setitem__(self, key, item):
33         self._entries[key] = item
34
35     def __iter__(self):
36         return self._entries.iterkeys()
37
38     def items(self):
39         return self._entries.items()
40
41     def __contains__(self, k):
42         return k in self._entries
43
44     def size(self):
45         return 0
46
47 class MagicDirectory(Directory):
48     '''A special directory that logically contains the set of all extant
49     keep locators.  When a file is referenced by lookup(), it is tested
50     to see if it is a valid keep locator to a manifest, and if so, loads the manifest
51     contents as a subdirectory of this directory with the locator as the directory name.
52     Since querying a list of all extant keep locators is impractical, only loaded collections 
53     are visible to readdir().'''
54
55     def __init__(self, parent_inode, inodes):
56         super(MagicDirectory, self).__init__(parent_inode)
57         self.inodes = inodes
58
59     def __contains__(self, k):
60         if k in self._entries:
61             return True
62         try:
63             if arvados.Keep.get(k):
64                 return True
65             else:
66                 return False
67         except Exception as e:
68             #print 'exception keep', e
69             return False
70
71     def __getitem__(self, item):
72         if item not in self._entries:
73             collection = arvados.CollectionReader(arvados.Keep.get(item))
74             self._entries[item] = self.inodes.add_entry(Directory(self.inode))
75             self.inodes.load_collection(self._entries[item], collection)
76         return self._entries[item]
77
78 class File(object):
79     '''Wraps a StreamFileReader for use by Directory.'''
80
81     def __init__(self, parent_inode, reader):
82         self.inode = None
83         self.parent_inode = parent_inode
84         self.reader = reader
85
86     def size(self):
87         return self.reader.size()
88
89 class FileHandle(object):
90     '''Connects a numeric file handle to a File or Directory object that has 
91     been opened by the client.'''
92
93     def __init__(self, fh, entry):
94         self.fh = fh
95         self.entry = entry
96
97 class Inodes(object):
98     '''Manage the set of inodes.  This is the mapping from a numeric id 
99     to a concrete File or Directory object'''
100
101     def __init__(self):
102         self._entries = {}
103         self._counter = llfuse.ROOT_INODE
104
105     def __getitem__(self, item):
106         return self._entries[item]
107
108     def __setitem__(self, key, item):
109         self._entries[key] = item
110
111     def __iter__(self):
112         return self._entries.iterkeys()
113
114     def items(self):
115         return self._entries.items()
116
117     def __contains__(self, k):
118         return k in self._entries
119
120     def load_collection(self, parent_dir, collection):
121         '''parent_dir is the Directory object that will be populated by the collection.
122         collection is the arvados.CollectionReader to use as the source'''
123         for s in collection.all_streams():
124             cwd = parent_dir
125             for part in s.name().split('/'):
126                 if part != '' and part != '.':
127                     if part not in cwd:
128                         cwd[part] = self.add_entry(Directory(cwd.inode))
129                     cwd = cwd[part]
130             for k, v in s.files().items():
131                 cwd[k] = self.add_entry(File(cwd.inode, v))
132
133     def add_entry(self, entry):
134         entry.inode = self._counter
135         self._entries[entry.inode] = entry
136         self._counter += 1
137         return entry    
138
139 class Operations(llfuse.Operations):
140     '''This is the main interface with llfuse.  The methods on this object are
141     called by llfuse threads to service FUSE events to query and read from 
142     the file system.
143
144     llfuse has its own global lock which is acquired before calling a request handler,
145     so request handlers do not run concurrently unless the lock is explicitly released 
146     with llfuse.lock_released.'''
147
148     def __init__(self, uid, gid):
149         super(Operations, self).__init__()
150
151         self.inodes = Inodes()
152         self.uid = uid
153         self.gid = gid
154         
155         # dict of inode to filehandle
156         self._filehandles = {}
157         self._filehandles_counter = 1
158
159         # Other threads that need to wait until the fuse driver
160         # is fully initialized should wait() on this event object.
161         self.initlock = threading.Event()
162
163     def init(self):
164         # Allow threads that are waiting for the driver to be finished
165         # initializing to continue
166         self.initlock.set()
167
168     def access(self, inode, mode, ctx):
169         return True
170    
171     def getattr(self, inode):
172         e = self.inodes[inode]
173
174         entry = llfuse.EntryAttributes()
175         entry.st_ino = inode
176         entry.generation = 0
177         entry.entry_timeout = 300
178         entry.attr_timeout = 300
179
180         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
181         if isinstance(e, Directory):
182             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
183         else:
184             entry.st_mode |= stat.S_IFREG
185
186         entry.st_nlink = 1
187         entry.st_uid = self.uid
188         entry.st_gid = self.gid
189         entry.st_rdev = 0
190
191         entry.st_size = e.size()
192
193         entry.st_blksize = 1024
194         entry.st_blocks = e.size()/1024
195         if e.size()/1024 != 0:
196             entry.st_blocks += 1
197         entry.st_atime = 0
198         entry.st_mtime = 0
199         entry.st_ctime = 0
200
201         return entry
202
203     def lookup(self, parent_inode, name):
204         #print "lookup: parent_inode", parent_inode, "name", name
205         inode = None
206
207         if name == '.':
208             inode = parent_inode
209         else:
210             if parent_inode in self.inodes:
211                 p = self.inodes[parent_inode]
212                 if name == '..':
213                     inode = p.parent_inode
214                 elif name in p:
215                     inode = p[name].inode
216
217         if inode != None:
218             return self.getattr(inode)
219         else:
220             raise llfuse.FUSEError(errno.ENOENT)
221    
222     def open(self, inode, flags):
223         if inode in self.inodes:
224             p = self.inodes[inode]
225         else:
226             raise llfuse.FUSEError(errno.ENOENT)
227
228         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
229             raise llfuse.FUSEError(errno.EROFS)
230
231         if isinstance(p, Directory):
232             raise llfuse.FUSEError(errno.EISDIR)
233
234         fh = self._filehandles_counter
235         self._filehandles_counter += 1
236         self._filehandles[fh] = FileHandle(fh, p)
237         return fh
238
239     def read(self, fh, off, size):
240         #print "read", fh, off, size
241         if fh in self._filehandles:
242             handle = self._filehandles[fh]
243         else:
244             raise llfuse.FUSEError(errno.EBADF)
245
246         try:
247             with llfuse.lock_released:
248                 return handle.entry.reader.readfrom(off, size)
249         except:
250             raise llfuse.FUSEError(errno.EIO)
251
252     def release(self, fh):
253         if fh in self._filehandles:
254             del self._filehandles[fh]
255
256     def opendir(self, inode):
257         #print "opendir: inode", inode
258
259         if inode in self.inodes:
260             p = self.inodes[inode]
261         else:
262             raise llfuse.FUSEError(errno.ENOENT)
263
264         if not isinstance(p, Directory):
265             raise llfuse.FUSEError(errno.ENOTDIR)
266
267         fh = self._filehandles_counter
268         self._filehandles_counter += 1
269         if p.parent_inode in self.inodes:
270             parent = self.inodes[p.parent_inode]
271         else:
272             parent = None
273         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
274         return fh
275
276     def readdir(self, fh, off):
277         #print "readdir: fh", fh, "off", off
278
279         if fh in self._filehandles:
280             handle = self._filehandles[fh]
281         else:
282             raise llfuse.FUSEError(errno.EBADF)
283
284         #print "handle.entry", handle.entry
285
286         e = off
287         while e < len(handle.entry):
288             yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
289             e += 1
290
291     def releasedir(self, fh):
292         del self._filehandles[fh]
293
294     def statfs(self):
295         st = llfuse.StatvfsData()
296         st.f_bsize = 1024 * 1024
297         st.f_blocks = 0
298         st.f_files = 0
299
300         st.f_bfree = 0
301         st.f_bavail = 0
302
303         st.f_ffree = 0
304         st.f_favail = 0
305
306         st.f_frsize = 0
307         return st
308
309     # The llfuse documentation recommends only overloading functions that
310     # are actually implemented, as the default implementation will raise ENOSYS.
311     # However, there is a bug in the llfuse default implementation of create()
312     # "create() takes exactly 5 positional arguments (6 given)" which will crash
313     # arv-mount.
314     # The workaround is to implement it with the proper number of parameters,
315     # and then everything works out.
316     def create(self, p1, p2, p3, p4, p5):
317         raise llfuse.FUSEError(errno.EROFS)