9 from fusefile import StringFile, ObjectFile, FuseArvadosFile
10 from fresh import FreshBase, convertTime, use_counter
12 import arvados.collection
13 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
15 _logger = logging.getLogger('arvados.arvados_fuse')
18 # Match any character which FUSE or Linux cannot accommodate as part
19 # of a filename. (If present in a collection filename, they will
20 # appear as underscores in the fuse mount.)
21 _disallowed_filename_characters = re.compile('[\x00/]')
23 def sanitize_filename(dirty):
24 """Replace disallowed filename characters with harmless "_"."""
34 return _disallowed_filename_characters.sub('_', dirty)
37 class Directory(FreshBase):
38 """Generic directory object, backed by a dict.
40 Consists of a set of entries with the key representing the filename
41 and the value referencing a File or Directory object.
44 def __init__(self, parent_inode, inodes):
45 super(Directory, self).__init__()
47 """parent_inode is the integer inode number"""
49 if not isinstance(parent_inode, int):
50 raise Exception("parent_inode should be an int")
51 self.parent_inode = parent_inode
54 self._mtime = time.time()
56 # Overriden by subclasses to implement logic to update the entries dict
57 # when the directory is stale
62 # Only used when computing the size of the disk footprint of the directory
70 def checkupdate(self):
74 except apiclient.errors.HttpError as e:
78 def __getitem__(self, item):
80 return self._entries[item]
85 return list(self._entries.items())
88 def __contains__(self, k):
90 return k in self._entries
93 self.inodes.touch(self)
94 super(Directory, self).fresh()
96 def merge(self, items, fn, same, new_entry):
97 """Helper method for updating the contents of the directory.
99 Takes a list describing the new contents of the directory, reuse
100 entries that are the same in both the old and new lists, create new
101 entries, and delete old entries missing from the new list.
103 :items: iterable with new directory contents
105 :fn: function to take an entry in 'items' and return the desired file or
106 directory name, or None if this entry should be skipped
108 :same: function to compare an existing entry (a File or Directory
109 object) with an entry in the items list to determine whether to keep
112 :new_entry: function to create a new directory entry (File or Directory
113 object) from an entry in the items list.
117 oldentries = self._entries
121 name = sanitize_filename(fn(i))
123 if name in oldentries and same(oldentries[name], i):
124 # move existing directory entry over
125 self._entries[name] = oldentries[name]
128 # create new directory entry
131 self._entries[name] = self.inodes.add_entry(ent)
134 # delete any other directory entries that were not in found in 'items'
136 llfuse.invalidate_entry(self.inode, str(i))
137 self.inodes.del_entry(oldentries[i])
141 llfuse.invalidate_inode(self.inode)
142 self._mtime = time.time()
146 def clear(self, force=False):
147 """Delete all entries"""
149 if not self.in_use() or force:
150 oldentries = self._entries
153 if not oldentries[n].clear(force):
154 self._entries = oldentries
157 llfuse.invalidate_entry(self.inode, str(n))
158 self.inodes.del_entry(oldentries[n])
159 llfuse.invalidate_inode(self.inode)
174 class CollectionDirectoryBase(Directory):
175 def __init__(self, parent_inode, inodes, collection):
176 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
177 self.collection = collection
179 def new_entry(self, name, item, mtime):
180 name = sanitize_filename(name)
181 if isinstance(item, arvados.collection.RichCollectionBase):
182 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
183 self._entries[name].populate(mtime)
185 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
187 def on_event(self, event, collection, name, item):
188 _logger.warn("Got event! %s %s %s %s", event, collection, name, item)
189 if collection == self.collection:
191 if event == arvados.collection.ADD:
192 self.new_entry(name, item, self.mtime())
193 elif event == arvados.collection.DEL:
194 ent = self._entries[name]
195 del self._entries[name]
196 llfuse.invalidate_entry(self.inode, name)
197 self.inodes.del_entry(ent)
198 elif event == arvados.collection.MOD:
199 ent = self._entries[name]
200 llfuse.invalidate_inode(ent.inode)
201 _logger.warn("Finished handling event")
203 def populate(self, mtime):
205 self.collection.subscribe(self.on_event)
206 for entry, item in self.collection.items():
207 self.new_entry(entry, item, self.mtime())
210 return self.collection.writable()
213 self.collection.root_collection().save()
215 class CollectionDirectory(CollectionDirectoryBase):
216 """Represents the root of a directory tree holding a collection."""
218 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
219 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
221 self.num_retries = num_retries
222 self.collection_object_file = None
223 self.collection_object = None
224 if isinstance(collection_record, dict):
225 self.collection_locator = collection_record['uuid']
226 self._mtime = convertTime(collection_record.get('modified_at'))
228 self.collection_locator = collection_record
230 self._manifest_size = 0
231 if self.collection_locator:
232 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
235 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
238 return self.collection.writable() if self.collection is not None else self._writable
240 # Used by arv-web.py to switch the contents of the CollectionDirectory
241 def change_collection(self, new_locator):
242 """Switch the contents of the CollectionDirectory.
244 Must be called with llfuse.lock held.
247 self.collection_locator = new_locator
248 self.collection_object = None
251 def new_collection(self, new_collection_object, coll_reader):
253 self.clear(force=True)
255 self.collection_object = new_collection_object
257 if self.collection_object:
258 self._mtime = convertTime(self.collection_object.get('modified_at'))
260 if self.collection_object_file is not None:
261 self.collection_object_file.update(self.collection_object)
263 self.collection = coll_reader
264 self.populate(self.mtime())
268 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
271 if self.collection_locator is None:
275 with llfuse.lock_released:
276 if uuid_pattern.match(self.collection_locator):
277 coll_reader = arvados.collection.Collection(
278 self.collection_locator, self.api, self.api.keep,
279 num_retries=self.num_retries)
281 coll_reader = arvados.collection.CollectionReader(
282 self.collection_locator, self.api, self.api.keep,
283 num_retries=self.num_retries)
284 new_collection_object = coll_reader.api_response() or {}
285 # If the Collection only exists in Keep, there will be no API
286 # response. Fill in the fields we need.
287 if 'uuid' not in new_collection_object:
288 new_collection_object['uuid'] = self.collection_locator
289 if "portable_data_hash" not in new_collection_object:
290 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
291 if 'manifest_text' not in new_collection_object:
292 new_collection_object['manifest_text'] = coll_reader.manifest_text()
293 # end with llfuse.lock_released, re-acquire lock
295 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
296 self.new_collection(new_collection_object, coll_reader)
298 self._manifest_size = len(coll_reader.manifest_text())
299 _logger.debug("%s manifest_size %i", self, self._manifest_size)
303 except arvados.errors.NotFoundError:
304 _logger.exception("arv-mount %s: error", self.collection_locator)
305 except arvados.errors.ArgumentError as detail:
306 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
307 if self.collection_object is not None and "manifest_text" in self.collection_object:
308 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
310 _logger.exception("arv-mount %s: error", self.collection_locator)
311 if self.collection_object is not None and "manifest_text" in self.collection_object:
312 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
315 def __getitem__(self, item):
317 if item == '.arvados#collection':
318 if self.collection_object_file is None:
319 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
320 self.inodes.add_entry(self.collection_object_file)
321 return self.collection_object_file
323 return super(CollectionDirectory, self).__getitem__(item)
325 def __contains__(self, k):
326 if k == '.arvados#collection':
329 return super(CollectionDirectory, self).__contains__(k)
331 def invalidate(self):
332 self.collection_object = None
333 self.collection_object_file = None
334 super(CollectionDirectory, self).invalidate()
337 return (self.collection_locator is not None)
340 return self._manifest_size * 128
342 class MagicDirectory(Directory):
343 """A special directory that logically contains the set of all extant keep locators.
345 When a file is referenced by lookup(), it is tested to see if it is a valid
346 keep locator to a manifest, and if so, loads the manifest contents as a
347 subdirectory of this directory with the locator as the directory name.
348 Since querying a list of all extant keep locators is impractical, only
349 collections that have already been accessed are visible to readdir().
354 This directory provides access to Arvados collections as subdirectories listed
355 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
356 the form '1234567890abcdefghijklmnopqrstuv+123').
358 Note that this directory will appear empty until you attempt to access a
359 specific collection subdirectory (such as trying to 'cd' into it), at which
360 point the collection will actually be looked up on the server and the directory
361 will appear if it exists.
364 def __init__(self, parent_inode, inodes, api, num_retries):
365 super(MagicDirectory, self).__init__(parent_inode, inodes)
367 self.num_retries = num_retries
369 def __setattr__(self, name, value):
370 super(MagicDirectory, self).__setattr__(name, value)
371 # When we're assigned an inode, add a README.
372 if ((name == 'inode') and (self.inode is not None) and
373 (not self._entries)):
374 self._entries['README'] = self.inodes.add_entry(
375 StringFile(self.inode, self.README_TEXT, time.time()))
376 # If we're the root directory, add an identical by_id subdirectory.
377 if self.inode == llfuse.ROOT_INODE:
378 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
379 self.inode, self.inodes, self.api, self.num_retries))
381 def __contains__(self, k):
382 if k in self._entries:
385 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
389 e = self.inodes.add_entry(CollectionDirectory(
390 self.inode, self.inodes, self.api, self.num_retries, k))
397 except Exception as e:
398 _logger.debug('arv-mount exception keep %s', e)
401 def __getitem__(self, item):
403 return self._entries[item]
405 raise KeyError("No collection with id " + item)
407 def clear(self, force=False):
411 class RecursiveInvalidateDirectory(Directory):
412 def invalidate(self):
414 super(RecursiveInvalidateDirectory, self).invalidate()
415 for a in self._entries:
416 self._entries[a].invalidate()
421 class TagsDirectory(RecursiveInvalidateDirectory):
422 """A special directory that contains as subdirectories all tags visible to the user."""
424 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
425 super(TagsDirectory, self).__init__(parent_inode, inodes)
427 self.num_retries = num_retries
429 self._poll_time = poll_time
432 with llfuse.lock_released:
433 tags = self.api.links().list(
434 filters=[['link_class', '=', 'tag']],
435 select=['name'], distinct=True
436 ).execute(num_retries=self.num_retries)
438 self.merge(tags['items'],
440 lambda a, i: a.tag == i['name'],
441 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
444 class TagDirectory(Directory):
445 """A special directory that contains as subdirectories all collections visible
446 to the user that are tagged with a particular tag.
449 def __init__(self, parent_inode, inodes, api, num_retries, tag,
450 poll=False, poll_time=60):
451 super(TagDirectory, self).__init__(parent_inode, inodes)
453 self.num_retries = num_retries
456 self._poll_time = poll_time
459 with llfuse.lock_released:
460 taggedcollections = self.api.links().list(
461 filters=[['link_class', '=', 'tag'],
462 ['name', '=', self.tag],
463 ['head_uuid', 'is_a', 'arvados#collection']],
465 ).execute(num_retries=self.num_retries)
466 self.merge(taggedcollections['items'],
467 lambda i: i['head_uuid'],
468 lambda a, i: a.collection_locator == i['head_uuid'],
469 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
472 class ProjectDirectory(Directory):
473 """A special directory that contains the contents of a project."""
475 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
476 poll=False, poll_time=60):
477 super(ProjectDirectory, self).__init__(parent_inode, inodes)
479 self.num_retries = num_retries
480 self.project_object = project_object
481 self.project_object_file = None
482 self.uuid = project_object['uuid']
484 self._poll_time = poll_time
486 def createDirectory(self, i):
487 if collection_uuid_pattern.match(i['uuid']):
488 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
489 elif group_uuid_pattern.match(i['uuid']):
490 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
491 elif link_uuid_pattern.match(i['uuid']):
492 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
493 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
496 elif uuid_pattern.match(i['uuid']):
497 return ObjectFile(self.parent_inode, i)
502 if self.project_object_file == None:
503 self.project_object_file = ObjectFile(self.inode, self.project_object)
504 self.inodes.add_entry(self.project_object_file)
508 if i['name'] is None or len(i['name']) == 0:
510 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
511 # collection or subproject
513 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
516 elif 'kind' in i and i['kind'].startswith('arvados#'):
518 return "{}.{}".format(i['name'], i['kind'][8:])
523 if isinstance(a, CollectionDirectory):
524 return a.collection_locator == i['uuid']
525 elif isinstance(a, ProjectDirectory):
526 return a.uuid == i['uuid']
527 elif isinstance(a, ObjectFile):
528 return a.uuid == i['uuid'] and not a.stale()
531 with llfuse.lock_released:
532 if group_uuid_pattern.match(self.uuid):
533 self.project_object = self.api.groups().get(
534 uuid=self.uuid).execute(num_retries=self.num_retries)
535 elif user_uuid_pattern.match(self.uuid):
536 self.project_object = self.api.users().get(
537 uuid=self.uuid).execute(num_retries=self.num_retries)
539 contents = arvados.util.list_all(self.api.groups().contents,
540 self.num_retries, uuid=self.uuid)
542 # end with llfuse.lock_released, re-acquire lock
547 self.createDirectory)
549 def __getitem__(self, item):
551 if item == '.arvados#project':
552 return self.project_object_file
554 return super(ProjectDirectory, self).__getitem__(item)
556 def __contains__(self, k):
557 if k == '.arvados#project':
560 return super(ProjectDirectory, self).__contains__(k)
566 return len(self.project_object) * 1024 if self.project_object else 0
568 class SharedDirectory(Directory):
569 """A special directory that represents users or groups who have shared projects with me."""
571 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
572 poll=False, poll_time=60):
573 super(SharedDirectory, self).__init__(parent_inode, inodes)
575 self.num_retries = num_retries
576 self.current_user = api.users().current().execute(num_retries=num_retries)
578 self._poll_time = poll_time
581 with llfuse.lock_released:
582 all_projects = arvados.util.list_all(
583 self.api.groups().list, self.num_retries,
584 filters=[['group_class','=','project']])
586 for ob in all_projects:
587 objects[ob['uuid']] = ob
591 for ob in all_projects:
592 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
594 root_owners[ob['owner_uuid']] = True
596 lusers = arvados.util.list_all(
597 self.api.users().list, self.num_retries,
598 filters=[['uuid','in', list(root_owners)]])
599 lgroups = arvados.util.list_all(
600 self.api.groups().list, self.num_retries,
601 filters=[['uuid','in', list(root_owners)]])
607 objects[l["uuid"]] = l
609 objects[l["uuid"]] = l
612 for r in root_owners:
616 contents[obr["name"]] = obr
617 if "first_name" in obr:
618 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
621 if r['owner_uuid'] not in objects:
622 contents[r['name']] = r
624 # end with llfuse.lock_released, re-acquire lock
627 self.merge(contents.items(),
629 lambda a, i: a.uuid == i[1]['uuid'],
630 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))