9 from fusefile import StringFile, StreamReaderFile, ObjectFile
10 from fresh import FreshBase, convertTime
12 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
14 _logger = logging.getLogger('arvados.arvados_fuse')
17 # Match any character which FUSE or Linux cannot accommodate as part
18 # of a filename. (If present in a collection filename, they will
19 # appear as underscores in the fuse mount.)
20 _disallowed_filename_characters = re.compile('[\x00/]')
22 def sanitize_filename(dirty):
23 """Replace disallowed filename characters with harmless "_"."""
33 return _disallowed_filename_characters.sub('_', dirty)
35 def use_counter(orig_func):
36 @functools.wraps(orig_func)
37 def use_counter_wrapper(self, *args, **kwargs):
40 return orig_func(self, *args, **kwargs)
43 return use_counter_wrapper
46 class Directory(FreshBase):
47 """Generic directory object, backed by a dict.
49 Consists of a set of entries with the key representing the filename
50 and the value referencing a File or Directory object.
53 def __init__(self, parent_inode, inodes):
54 super(Directory, self).__init__()
56 """parent_inode is the integer inode number"""
58 if not isinstance(parent_inode, int):
59 raise Exception("parent_inode should be an int")
60 self.parent_inode = parent_inode
63 self._mtime = time.time()
66 # Overriden by subclasses to implement logic to update the entries dict
67 # when the directory is stale
72 # Only used when computing the size of the disk footprint of the directory
78 return self.use_count > 0
86 def checkupdate(self):
90 except apiclient.errors.HttpError as e:
94 def __getitem__(self, item):
96 return self._entries[item]
101 return list(self._entries.items())
104 def __contains__(self, k):
106 return k in self._entries
109 self.inodes.touch(self)
110 super(Directory, self).fresh()
112 def merge(self, items, fn, same, new_entry):
113 """Helper method for updating the contents of the directory.
115 Takes a list describing the new contents of the directory, reuse
116 entries that are the same in both the old and new lists, create new
117 entries, and delete old entries missing from the new list.
119 :items: iterable with new directory contents
121 :fn: function to take an entry in 'items' and return the desired file or
122 directory name, or None if this entry should be skipped
124 :same: function to compare an existing entry (a File or Directory
125 object) with an entry in the items list to determine whether to keep
128 :new_entry: function to create a new directory entry (File or Directory
129 object) from an entry in the items list.
133 oldentries = self._entries
137 name = sanitize_filename(fn(i))
139 if name in oldentries and same(oldentries[name], i):
140 # move existing directory entry over
141 self._entries[name] = oldentries[name]
144 # create new directory entry
147 self._entries[name] = self.inodes.add_entry(ent)
150 # delete any other directory entries that were not in found in 'items'
152 llfuse.invalidate_entry(self.inode, str(i))
153 self.inodes.del_entry(oldentries[i])
157 self._mtime = time.time()
161 def clear(self, force=False):
162 """Delete all entries"""
164 if not self.in_use() or force:
165 oldentries = self._entries
168 if isinstance(n, Directory):
169 if not n.clear(force):
170 self._entries = oldentries
173 if isinstance(n, Directory):
174 llfuse.invalidate_entry(self.inode, str(n))
175 self.inodes.del_entry(oldentries[n])
176 llfuse.invalidate_inode(self.inode)
186 class CollectionDirectory(Directory):
187 """Represents the root of a directory tree holding a collection."""
189 def __init__(self, parent_inode, inodes, api, num_retries, collection):
190 super(CollectionDirectory, self).__init__(parent_inode, inodes)
192 self.num_retries = num_retries
193 self.collection_object_file = None
194 self.collection_object = None
195 if isinstance(collection, dict):
196 self.collection_locator = collection['uuid']
197 self._mtime = convertTime(collection.get('modified_at'))
199 self.collection_locator = collection
203 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
205 # Used by arv-web.py to switch the contents of the CollectionDirectory
206 def change_collection(self, new_locator):
207 """Switch the contents of the CollectionDirectory.
209 Must be called with llfuse.lock held.
212 self.collection_locator = new_locator
213 self.collection_object = None
216 def new_collection(self, new_collection_object, coll_reader):
217 self.collection_object = new_collection_object
219 self._mtime = convertTime(self.collection_object.get('modified_at'))
221 if self.collection_object_file is not None:
222 self.collection_object_file.update(self.collection_object)
224 self.clear(force=True)
225 for s in coll_reader.all_streams():
227 for part in s.name().split('/'):
228 if part != '' and part != '.':
229 partname = sanitize_filename(part)
230 if partname not in cwd._entries:
231 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
232 # (hack until using new API)
233 cwd._entries[partname].inc_use()
235 cwd = cwd._entries[partname]
236 for k, v in s.files().items():
237 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
241 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
244 if self.collection_locator is None:
248 with llfuse.lock_released:
249 coll_reader = arvados.CollectionReader(
250 self.collection_locator, self.api, self.api.keep,
251 num_retries=self.num_retries)
252 new_collection_object = coll_reader.api_response() or {}
253 # If the Collection only exists in Keep, there will be no API
254 # response. Fill in the fields we need.
255 if 'uuid' not in new_collection_object:
256 new_collection_object['uuid'] = self.collection_locator
257 if "portable_data_hash" not in new_collection_object:
258 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
259 if 'manifest_text' not in new_collection_object:
260 new_collection_object['manifest_text'] = coll_reader.manifest_text()
261 coll_reader.normalize()
262 # end with llfuse.lock_released, re-acquire lock
264 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
265 self.new_collection(new_collection_object, coll_reader)
269 except arvados.errors.NotFoundError:
270 _logger.exception("arv-mount %s: error", self.collection_locator)
271 except arvados.errors.ArgumentError as detail:
272 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
273 if self.collection_object is not None and "manifest_text" in self.collection_object:
274 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
276 _logger.exception("arv-mount %s: error", self.collection_locator)
277 if self.collection_object is not None and "manifest_text" in self.collection_object:
278 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
281 def __getitem__(self, item):
283 if item == '.arvados#collection':
284 if self.collection_object_file is None:
285 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
286 self.inodes.add_entry(self.collection_object_file)
287 return self.collection_object_file
289 return super(CollectionDirectory, self).__getitem__(item)
291 def __contains__(self, k):
292 if k == '.arvados#collection':
295 return super(CollectionDirectory, self).__contains__(k)
297 def invalidate(self):
298 super(CollectionDirectory, self).invalidate()
299 self.collection_object = None
301 def clear(self, force=False):
302 if self.collection_locator is None:
305 return super(CollectionDirectory, self).clear(force)
308 class MagicDirectory(Directory):
309 """A special directory that logically contains the set of all extant keep locators.
311 When a file is referenced by lookup(), it is tested to see if it is a valid
312 keep locator to a manifest, and if so, loads the manifest contents as a
313 subdirectory of this directory with the locator as the directory name.
314 Since querying a list of all extant keep locators is impractical, only
315 collections that have already been accessed are visible to readdir().
320 This directory provides access to Arvados collections as subdirectories listed
321 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
322 the form '1234567890abcdefghijklmnopqrstuv+123').
324 Note that this directory will appear empty until you attempt to access a
325 specific collection subdirectory (such as trying to 'cd' into it), at which
326 point the collection will actually be looked up on the server and the directory
327 will appear if it exists.
330 def __init__(self, parent_inode, inodes, api, num_retries):
331 super(MagicDirectory, self).__init__(parent_inode, inodes)
333 self.num_retries = num_retries
335 def __setattr__(self, name, value):
336 super(MagicDirectory, self).__setattr__(name, value)
337 # When we're assigned an inode, add a README.
338 if ((name == 'inode') and (self.inode is not None) and
339 (not self._entries)):
340 self._entries['README'] = self.inodes.add_entry(
341 StringFile(self.inode, self.README_TEXT, time.time()))
342 # If we're the root directory, add an identical by_id subdirectory.
343 if self.inode == llfuse.ROOT_INODE:
344 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
345 self.inode, self.inodes, self.api, self.num_retries))
347 def __contains__(self, k):
348 if k in self._entries:
351 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
355 e = self.inodes.add_entry(CollectionDirectory(
356 self.inode, self.inodes, self.api, self.num_retries, k))
363 except Exception as e:
364 _logger.debug('arv-mount exception keep %s', e)
367 def __getitem__(self, item):
369 return self._entries[item]
371 raise KeyError("No collection with id " + item)
373 def clear(self, force=False):
377 class RecursiveInvalidateDirectory(Directory):
378 def invalidate(self):
380 super(RecursiveInvalidateDirectory, self).invalidate()
381 for a in self._entries:
382 self._entries[a].invalidate()
387 class TagsDirectory(RecursiveInvalidateDirectory):
388 """A special directory that contains as subdirectories all tags visible to the user."""
390 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
391 super(TagsDirectory, self).__init__(parent_inode, inodes)
393 self.num_retries = num_retries
395 self._poll_time = poll_time
398 with llfuse.lock_released:
399 tags = self.api.links().list(
400 filters=[['link_class', '=', 'tag']],
401 select=['name'], distinct=True
402 ).execute(num_retries=self.num_retries)
404 self.merge(tags['items'],
406 lambda a, i: a.tag == i['name'],
407 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
410 class TagDirectory(Directory):
411 """A special directory that contains as subdirectories all collections visible
412 to the user that are tagged with a particular tag.
415 def __init__(self, parent_inode, inodes, api, num_retries, tag,
416 poll=False, poll_time=60):
417 super(TagDirectory, self).__init__(parent_inode, inodes)
419 self.num_retries = num_retries
422 self._poll_time = poll_time
425 with llfuse.lock_released:
426 taggedcollections = self.api.links().list(
427 filters=[['link_class', '=', 'tag'],
428 ['name', '=', self.tag],
429 ['head_uuid', 'is_a', 'arvados#collection']],
431 ).execute(num_retries=self.num_retries)
432 self.merge(taggedcollections['items'],
433 lambda i: i['head_uuid'],
434 lambda a, i: a.collection_locator == i['head_uuid'],
435 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
438 class ProjectDirectory(Directory):
439 """A special directory that contains the contents of a project."""
441 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
442 poll=False, poll_time=60):
443 super(ProjectDirectory, self).__init__(parent_inode, inodes)
445 self.num_retries = num_retries
446 self.project_object = project_object
447 self.project_object_file = None
448 self.uuid = project_object['uuid']
450 self._poll_time = poll_time
452 def createDirectory(self, i):
453 if collection_uuid_pattern.match(i['uuid']):
454 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
455 elif group_uuid_pattern.match(i['uuid']):
456 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
457 elif link_uuid_pattern.match(i['uuid']):
458 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
459 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
462 elif uuid_pattern.match(i['uuid']):
463 return ObjectFile(self.parent_inode, i)
468 if self.project_object_file == None:
469 self.project_object_file = ObjectFile(self.inode, self.project_object)
470 self.inodes.add_entry(self.project_object_file)
474 if i['name'] is None or len(i['name']) == 0:
476 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
477 # collection or subproject
479 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
482 elif 'kind' in i and i['kind'].startswith('arvados#'):
484 return "{}.{}".format(i['name'], i['kind'][8:])
489 if isinstance(a, CollectionDirectory):
490 return a.collection_locator == i['uuid']
491 elif isinstance(a, ProjectDirectory):
492 return a.uuid == i['uuid']
493 elif isinstance(a, ObjectFile):
494 return a.uuid == i['uuid'] and not a.stale()
497 with llfuse.lock_released:
498 if group_uuid_pattern.match(self.uuid):
499 self.project_object = self.api.groups().get(
500 uuid=self.uuid).execute(num_retries=self.num_retries)
501 elif user_uuid_pattern.match(self.uuid):
502 self.project_object = self.api.users().get(
503 uuid=self.uuid).execute(num_retries=self.num_retries)
505 contents = arvados.util.list_all(self.api.groups().contents,
506 self.num_retries, uuid=self.uuid)
508 # end with llfuse.lock_released, re-acquire lock
513 self.createDirectory)
515 def __getitem__(self, item):
517 if item == '.arvados#project':
518 return self.project_object_file
520 return super(ProjectDirectory, self).__getitem__(item)
522 def __contains__(self, k):
523 if k == '.arvados#project':
526 return super(ProjectDirectory, self).__contains__(k)
529 class SharedDirectory(Directory):
530 """A special directory that represents users or groups who have shared projects with me."""
532 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
533 poll=False, poll_time=60):
534 super(SharedDirectory, self).__init__(parent_inode, inodes)
536 self.num_retries = num_retries
537 self.current_user = api.users().current().execute(num_retries=num_retries)
539 self._poll_time = poll_time
542 with llfuse.lock_released:
543 all_projects = arvados.util.list_all(
544 self.api.groups().list, self.num_retries,
545 filters=[['group_class','=','project']])
547 for ob in all_projects:
548 objects[ob['uuid']] = ob
552 for ob in all_projects:
553 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
555 root_owners[ob['owner_uuid']] = True
557 lusers = arvados.util.list_all(
558 self.api.users().list, self.num_retries,
559 filters=[['uuid','in', list(root_owners)]])
560 lgroups = arvados.util.list_all(
561 self.api.groups().list, self.num_retries,
562 filters=[['uuid','in', list(root_owners)]])
568 objects[l["uuid"]] = l
570 objects[l["uuid"]] = l
573 for r in root_owners:
577 contents[obr["name"]] = obr
578 if "first_name" in obr:
579 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
582 if r['owner_uuid'] not in objects:
583 contents[r['name']] = r
585 # end with llfuse.lock_released, re-acquire lock
588 self.merge(contents.items(),
590 lambda a, i: a.uuid == i[1]['uuid'],
591 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))