3198: Start by refactoring.
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24
25 from fusedir import sanitize_filename, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
26
27 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
28
29
30 _logger = logging.getLogger('arvados.arvados_fuse')
31
32
33 class FileHandle(object):
34     """Connects a numeric file handle to a File or Directory object that has
35     been opened by the client."""
36
37     def __init__(self, fh, entry):
38         self.fh = fh
39         self.entry = entry
40
41
42 class Inodes(object):
43     """Manage the set of inodes.  This is the mapping from a numeric id
44     to a concrete File or Directory object"""
45
46     def __init__(self):
47         self._entries = {}
48         self._counter = itertools.count(llfuse.ROOT_INODE)
49
50     def __getitem__(self, item):
51         return self._entries[item]
52
53     def __setitem__(self, key, item):
54         self._entries[key] = item
55
56     def __iter__(self):
57         return self._entries.iterkeys()
58
59     def items(self):
60         return self._entries.items()
61
62     def __contains__(self, k):
63         return k in self._entries
64
65     def add_entry(self, entry):
66         entry.inode = next(self._counter)
67         self._entries[entry.inode] = entry
68         return entry
69
70     def del_entry(self, entry):
71         llfuse.invalidate_inode(entry.inode)
72         del self._entries[entry.inode]
73
74
75 class Operations(llfuse.Operations):
76     """This is the main interface with llfuse.
77
78     The methods on this object are called by llfuse threads to service FUSE
79     events to query and read from the file system.
80
81     llfuse has its own global lock which is acquired before calling a request handler,
82     so request handlers do not run concurrently unless the lock is explicitly released
83     using 'with llfuse.lock_released:'
84
85     """
86
87     def __init__(self, uid, gid, encoding="utf-8"):
88         super(Operations, self).__init__()
89
90         self.inodes = Inodes()
91         self.uid = uid
92         self.gid = gid
93         self.encoding = encoding
94
95         # dict of inode to filehandle
96         self._filehandles = {}
97         self._filehandles_counter = 1
98
99         # Other threads that need to wait until the fuse driver
100         # is fully initialized should wait() on this event object.
101         self.initlock = threading.Event()
102
103     def init(self):
104         # Allow threads that are waiting for the driver to be finished
105         # initializing to continue
106         self.initlock.set()
107
108     def access(self, inode, mode, ctx):
109         return True
110
111     def getattr(self, inode):
112         if inode not in self.inodes:
113             raise llfuse.FUSEError(errno.ENOENT)
114
115         e = self.inodes[inode]
116
117         entry = llfuse.EntryAttributes()
118         entry.st_ino = inode
119         entry.generation = 0
120         entry.entry_timeout = 300
121         entry.attr_timeout = 300
122
123         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
124         if isinstance(e, Directory):
125             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
126         elif isinstance(e, StreamReaderFile):
127             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
128         else:
129             entry.st_mode |= stat.S_IFREG
130
131         entry.st_nlink = 1
132         entry.st_uid = self.uid
133         entry.st_gid = self.gid
134         entry.st_rdev = 0
135
136         entry.st_size = e.size()
137
138         entry.st_blksize = 512
139         entry.st_blocks = (e.size()/512)+1
140         entry.st_atime = int(e.atime())
141         entry.st_mtime = int(e.mtime())
142         entry.st_ctime = int(e.mtime())
143
144         return entry
145
146     def lookup(self, parent_inode, name):
147         name = unicode(name, self.encoding)
148         _logger.debug("arv-mount lookup: parent_inode %i name %s",
149                       parent_inode, name)
150         inode = None
151
152         if name == '.':
153             inode = parent_inode
154         else:
155             if parent_inode in self.inodes:
156                 p = self.inodes[parent_inode]
157                 if name == '..':
158                     inode = p.parent_inode
159                 elif isinstance(p, Directory) and name in p:
160                     inode = p[name].inode
161
162         if inode != None:
163             return self.getattr(inode)
164         else:
165             raise llfuse.FUSEError(errno.ENOENT)
166
167     def open(self, inode, flags):
168         if inode in self.inodes:
169             p = self.inodes[inode]
170         else:
171             raise llfuse.FUSEError(errno.ENOENT)
172
173         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
174             raise llfuse.FUSEError(errno.EROFS)
175
176         if isinstance(p, Directory):
177             raise llfuse.FUSEError(errno.EISDIR)
178
179         fh = self._filehandles_counter
180         self._filehandles_counter += 1
181         self._filehandles[fh] = FileHandle(fh, p)
182         return fh
183
184     def read(self, fh, off, size):
185         _logger.debug("arv-mount read %i %i %i", fh, off, size)
186         if fh in self._filehandles:
187             handle = self._filehandles[fh]
188         else:
189             raise llfuse.FUSEError(errno.EBADF)
190
191         # update atime
192         handle.entry._atime = time.time()
193
194         try:
195             with llfuse.lock_released:
196                 return handle.entry.readfrom(off, size)
197         except arvados.errors.NotFoundError as e:
198             _logger.warning("Block not found: " + str(e))
199             raise llfuse.FUSEError(errno.EIO)
200         except Exception:
201             _logger.exception()
202             raise llfuse.FUSEError(errno.EIO)
203
204     def release(self, fh):
205         if fh in self._filehandles:
206             del self._filehandles[fh]
207
208     def opendir(self, inode):
209         _logger.debug("arv-mount opendir: inode %i", inode)
210
211         if inode in self.inodes:
212             p = self.inodes[inode]
213         else:
214             raise llfuse.FUSEError(errno.ENOENT)
215
216         if not isinstance(p, Directory):
217             raise llfuse.FUSEError(errno.ENOTDIR)
218
219         fh = self._filehandles_counter
220         self._filehandles_counter += 1
221         if p.parent_inode in self.inodes:
222             parent = self.inodes[p.parent_inode]
223         else:
224             raise llfuse.FUSEError(errno.EIO)
225
226         # update atime
227         p._atime = time.time()
228
229         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
230         return fh
231
232     def readdir(self, fh, off):
233         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
234
235         if fh in self._filehandles:
236             handle = self._filehandles[fh]
237         else:
238             raise llfuse.FUSEError(errno.EBADF)
239
240         _logger.debug("arv-mount handle.entry %s", handle.entry)
241
242         e = off
243         while e < len(handle.entry):
244             if handle.entry[e][1].inode in self.inodes:
245                 try:
246                     yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
247                 except UnicodeEncodeError:
248                     pass
249             e += 1
250
251     def releasedir(self, fh):
252         del self._filehandles[fh]
253
254     def statfs(self):
255         st = llfuse.StatvfsData()
256         st.f_bsize = 64 * 1024
257         st.f_blocks = 0
258         st.f_files = 0
259
260         st.f_bfree = 0
261         st.f_bavail = 0
262
263         st.f_ffree = 0
264         st.f_favail = 0
265
266         st.f_frsize = 0
267         return st
268
269     # The llfuse documentation recommends only overloading functions that
270     # are actually implemented, as the default implementation will raise ENOSYS.
271     # However, there is a bug in the llfuse default implementation of create()
272     # "create() takes exactly 5 positional arguments (6 given)" which will crash
273     # arv-mount.
274     # The workaround is to implement it with the proper number of parameters,
275     # and then everything works out.
276     def create(self, inode_parent, name, mode, flags, ctx):
277         raise llfuse.FUSEError(errno.EROFS)