9 from fusefile import StringFile, ObjectFile, FuseArvadosFile
10 from fresh import FreshBase, convertTime, use_counter
12 import arvados.collection
13 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
15 _logger = logging.getLogger('arvados.arvados_fuse')
18 # Match any character which FUSE or Linux cannot accommodate as part
19 # of a filename. (If present in a collection filename, they will
20 # appear as underscores in the fuse mount.)
21 _disallowed_filename_characters = re.compile('[\x00/]')
23 def sanitize_filename(dirty):
24 """Replace disallowed filename characters with harmless "_"."""
34 return _disallowed_filename_characters.sub('_', dirty)
37 class Directory(FreshBase):
38 """Generic directory object, backed by a dict.
40 Consists of a set of entries with the key representing the filename
41 and the value referencing a File or Directory object.
44 def __init__(self, parent_inode, inodes):
45 super(Directory, self).__init__()
47 """parent_inode is the integer inode number"""
49 if not isinstance(parent_inode, int):
50 raise Exception("parent_inode should be an int")
51 self.parent_inode = parent_inode
54 self._mtime = time.time()
56 # Overriden by subclasses to implement logic to update the entries dict
57 # when the directory is stale
62 # Only used when computing the size of the disk footprint of the directory
70 def checkupdate(self):
74 except apiclient.errors.HttpError as e:
78 def __getitem__(self, item):
80 return self._entries[item]
85 return list(self._entries.items())
88 def __contains__(self, k):
90 return k in self._entries
93 self.inodes.touch(self)
94 super(Directory, self).fresh()
96 def merge(self, items, fn, same, new_entry):
97 """Helper method for updating the contents of the directory.
99 Takes a list describing the new contents of the directory, reuse
100 entries that are the same in both the old and new lists, create new
101 entries, and delete old entries missing from the new list.
103 :items: iterable with new directory contents
105 :fn: function to take an entry in 'items' and return the desired file or
106 directory name, or None if this entry should be skipped
108 :same: function to compare an existing entry (a File or Directory
109 object) with an entry in the items list to determine whether to keep
112 :new_entry: function to create a new directory entry (File or Directory
113 object) from an entry in the items list.
117 oldentries = self._entries
121 name = sanitize_filename(fn(i))
123 if name in oldentries and same(oldentries[name], i):
124 # move existing directory entry over
125 self._entries[name] = oldentries[name]
128 # create new directory entry
131 self._entries[name] = self.inodes.add_entry(ent)
134 # delete any other directory entries that were not in found in 'items'
136 llfuse.invalidate_entry(self.inode, str(i))
137 self.inodes.del_entry(oldentries[i])
141 llfuse.invalidate_inode(self.inode)
142 self._mtime = time.time()
146 def clear(self, force=False):
147 """Delete all entries"""
149 if not self.in_use() or force:
150 oldentries = self._entries
153 if not oldentries[n].clear(force):
154 self._entries = oldentries
157 llfuse.invalidate_entry(self.inode, str(n))
158 self.inodes.del_entry(oldentries[n])
159 llfuse.invalidate_inode(self.inode)
174 class CollectionDirectoryBase(Directory):
175 def __init__(self, parent_inode, inodes, collection):
176 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
177 self.collection = collection
179 def new_entry(self, name, item, mtime):
180 name = sanitize_filename(name)
181 if isinstance(item, arvados.collection.RichCollectionBase):
182 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
183 self._entries[name].populate(mtime)
185 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
187 def on_event(self, event, collection, name, item):
188 _logger.warn("Got event! %s %s %s %s", event, collection, name, item)
189 if collection == self.collection:
191 if event == arvados.collection.ADD:
192 self.new_entry(name, item, self.mtime())
193 elif event == arvados.collection.DEL:
194 ent = self._entries[name]
195 del self._entries[name]
196 llfuse.invalidate_entry(self.inode, name)
197 self.inodes.del_entry(ent)
198 elif event == arvados.collection.MOD:
199 ent = self._entries[name]
200 llfuse.invalidate_inode(ent.inode)
201 _logger.warn("Finished handling event")
203 def populate(self, mtime):
205 self.collection.subscribe(self.on_event)
206 for entry, item in self.collection.items():
207 self.new_entry(entry, item, self.mtime())
210 return self.collection.writable()
213 self.collection.root_collection().save()
215 class CollectionDirectory(CollectionDirectoryBase):
216 """Represents the root of a directory tree holding a collection."""
218 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
219 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
221 self.num_retries = num_retries
222 self.collection_record_file = None
223 self.collection_record = None
224 if isinstance(collection_record, dict):
225 self.collection_locator = collection_record['uuid']
226 self._mtime = convertTime(collection_record.get('modified_at'))
228 self.collection_locator = collection_record
230 self._manifest_size = 0
231 if self.collection_locator:
232 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
235 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
238 return self.collection.writable() if self.collection is not None else self._writable
240 # Used by arv-web.py to switch the contents of the CollectionDirectory
241 def change_collection(self, new_locator):
242 """Switch the contents of the CollectionDirectory.
244 Must be called with llfuse.lock held.
247 self.collection_locator = new_locator
248 self.collection_record = None
251 def new_collection(self, new_collection_record, coll_reader):
253 self.clear(force=True)
255 self.collection_record = new_collection_record
257 if self.collection_record:
258 self._mtime = convertTime(self.collection_record.get('modified_at'))
259 self.collection_locator = self.collection_record["uuid"]
260 if self.collection_record_file is not None:
261 self.collection_record_file.update(self.collection_record)
263 self.collection = coll_reader
264 self.populate(self.mtime())
267 return self.collection_locator
271 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
274 if self.collection_locator is None:
278 with llfuse.lock_released:
279 _logger.debug("Updating %s", self.collection_locator)
281 self.collection.update()
283 if uuid_pattern.match(self.collection_locator):
284 coll_reader = arvados.collection.Collection(
285 self.collection_locator, self.api, self.api.keep,
286 num_retries=self.num_retries)
288 coll_reader = arvados.collection.CollectionReader(
289 self.collection_locator, self.api, self.api.keep,
290 num_retries=self.num_retries)
291 new_collection_record = coll_reader.api_response() or {}
292 # If the Collection only exists in Keep, there will be no API
293 # response. Fill in the fields we need.
294 if 'uuid' not in new_collection_record:
295 new_collection_record['uuid'] = self.collection_locator
296 if "portable_data_hash" not in new_collection_record:
297 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
298 if 'manifest_text' not in new_collection_record:
299 new_collection_record['manifest_text'] = coll_reader.manifest_text()
301 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
302 self.new_collection(new_collection_record, coll_reader)
304 self._manifest_size = len(coll_reader.manifest_text())
305 _logger.debug("%s manifest_size %i", self, self._manifest_size)
306 # end with llfuse.lock_released, re-acquire lock
310 except arvados.errors.NotFoundError:
311 _logger.exception("arv-mount %s: error", self.collection_locator)
312 except arvados.errors.ArgumentError as detail:
313 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
314 if self.collection_record is not None and "manifest_text" in self.collection_record:
315 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
317 _logger.exception("arv-mount %s: error", self.collection_locator)
318 if self.collection_record is not None and "manifest_text" in self.collection_record:
319 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
322 def __getitem__(self, item):
324 if item == '.arvados#collection':
325 if self.collection_record_file is None:
326 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
327 self.inodes.add_entry(self.collection_record_file)
328 return self.collection_record_file
330 return super(CollectionDirectory, self).__getitem__(item)
332 def __contains__(self, k):
333 if k == '.arvados#collection':
336 return super(CollectionDirectory, self).__contains__(k)
338 def invalidate(self):
339 self.collection_record = None
340 self.collection_record_file = None
341 super(CollectionDirectory, self).invalidate()
344 return (self.collection_locator is not None)
347 return self._manifest_size * 128
349 class MagicDirectory(Directory):
350 """A special directory that logically contains the set of all extant keep locators.
352 When a file is referenced by lookup(), it is tested to see if it is a valid
353 keep locator to a manifest, and if so, loads the manifest contents as a
354 subdirectory of this directory with the locator as the directory name.
355 Since querying a list of all extant keep locators is impractical, only
356 collections that have already been accessed are visible to readdir().
361 This directory provides access to Arvados collections as subdirectories listed
362 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
363 the form '1234567890abcdefghijklmnopqrstuv+123').
365 Note that this directory will appear empty until you attempt to access a
366 specific collection subdirectory (such as trying to 'cd' into it), at which
367 point the collection will actually be looked up on the server and the directory
368 will appear if it exists.
371 def __init__(self, parent_inode, inodes, api, num_retries):
372 super(MagicDirectory, self).__init__(parent_inode, inodes)
374 self.num_retries = num_retries
376 def __setattr__(self, name, value):
377 super(MagicDirectory, self).__setattr__(name, value)
378 # When we're assigned an inode, add a README.
379 if ((name == 'inode') and (self.inode is not None) and
380 (not self._entries)):
381 self._entries['README'] = self.inodes.add_entry(
382 StringFile(self.inode, self.README_TEXT, time.time()))
383 # If we're the root directory, add an identical by_id subdirectory.
384 if self.inode == llfuse.ROOT_INODE:
385 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
386 self.inode, self.inodes, self.api, self.num_retries))
388 def __contains__(self, k):
389 if k in self._entries:
392 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
396 e = self.inodes.add_entry(CollectionDirectory(
397 self.inode, self.inodes, self.api, self.num_retries, k))
404 except Exception as e:
405 _logger.debug('arv-mount exception keep %s', e)
408 def __getitem__(self, item):
410 return self._entries[item]
412 raise KeyError("No collection with id " + item)
414 def clear(self, force=False):
418 class RecursiveInvalidateDirectory(Directory):
419 def invalidate(self):
421 super(RecursiveInvalidateDirectory, self).invalidate()
422 for a in self._entries:
423 self._entries[a].invalidate()
428 class TagsDirectory(RecursiveInvalidateDirectory):
429 """A special directory that contains as subdirectories all tags visible to the user."""
431 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
432 super(TagsDirectory, self).__init__(parent_inode, inodes)
434 self.num_retries = num_retries
436 self._poll_time = poll_time
439 with llfuse.lock_released:
440 tags = self.api.links().list(
441 filters=[['link_class', '=', 'tag']],
442 select=['name'], distinct=True
443 ).execute(num_retries=self.num_retries)
445 self.merge(tags['items'],
447 lambda a, i: a.tag == i['name'],
448 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
451 class TagDirectory(Directory):
452 """A special directory that contains as subdirectories all collections visible
453 to the user that are tagged with a particular tag.
456 def __init__(self, parent_inode, inodes, api, num_retries, tag,
457 poll=False, poll_time=60):
458 super(TagDirectory, self).__init__(parent_inode, inodes)
460 self.num_retries = num_retries
463 self._poll_time = poll_time
466 with llfuse.lock_released:
467 taggedcollections = self.api.links().list(
468 filters=[['link_class', '=', 'tag'],
469 ['name', '=', self.tag],
470 ['head_uuid', 'is_a', 'arvados#collection']],
472 ).execute(num_retries=self.num_retries)
473 self.merge(taggedcollections['items'],
474 lambda i: i['head_uuid'],
475 lambda a, i: a.collection_locator == i['head_uuid'],
476 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
479 class ProjectDirectory(Directory):
480 """A special directory that contains the contents of a project."""
482 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
483 poll=False, poll_time=60):
484 super(ProjectDirectory, self).__init__(parent_inode, inodes)
486 self.num_retries = num_retries
487 self.project_object = project_object
488 self.project_object_file = None
489 self.uuid = project_object['uuid']
491 self._poll_time = poll_time
493 def createDirectory(self, i):
494 if collection_uuid_pattern.match(i['uuid']):
495 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
496 elif group_uuid_pattern.match(i['uuid']):
497 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
498 elif link_uuid_pattern.match(i['uuid']):
499 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
500 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
503 elif uuid_pattern.match(i['uuid']):
504 return ObjectFile(self.parent_inode, i)
512 if self.project_object_file == None:
513 self.project_object_file = ObjectFile(self.inode, self.project_object)
514 self.inodes.add_entry(self.project_object_file)
518 if i['name'] is None or len(i['name']) == 0:
520 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
521 # collection or subproject
523 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
526 elif 'kind' in i and i['kind'].startswith('arvados#'):
528 return "{}.{}".format(i['name'], i['kind'][8:])
533 if isinstance(a, CollectionDirectory):
534 return a.collection_locator == i['uuid']
535 elif isinstance(a, ProjectDirectory):
536 return a.uuid == i['uuid']
537 elif isinstance(a, ObjectFile):
538 return a.uuid == i['uuid'] and not a.stale()
541 with llfuse.lock_released:
542 if group_uuid_pattern.match(self.uuid):
543 self.project_object = self.api.groups().get(
544 uuid=self.uuid).execute(num_retries=self.num_retries)
545 elif user_uuid_pattern.match(self.uuid):
546 self.project_object = self.api.users().get(
547 uuid=self.uuid).execute(num_retries=self.num_retries)
549 contents = arvados.util.list_all(self.api.groups().contents,
550 self.num_retries, uuid=self.uuid)
552 # end with llfuse.lock_released, re-acquire lock
557 self.createDirectory)
559 def __getitem__(self, item):
561 if item == '.arvados#project':
562 return self.project_object_file
564 return super(ProjectDirectory, self).__getitem__(item)
566 def __contains__(self, k):
567 if k == '.arvados#project':
570 return super(ProjectDirectory, self).__contains__(k)
576 class SharedDirectory(Directory):
577 """A special directory that represents users or groups who have shared projects with me."""
579 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
580 poll=False, poll_time=60):
581 super(SharedDirectory, self).__init__(parent_inode, inodes)
583 self.num_retries = num_retries
584 self.current_user = api.users().current().execute(num_retries=num_retries)
586 self._poll_time = poll_time
589 with llfuse.lock_released:
590 all_projects = arvados.util.list_all(
591 self.api.groups().list, self.num_retries,
592 filters=[['group_class','=','project']])
594 for ob in all_projects:
595 objects[ob['uuid']] = ob
599 for ob in all_projects:
600 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
602 root_owners[ob['owner_uuid']] = True
604 lusers = arvados.util.list_all(
605 self.api.users().list, self.num_retries,
606 filters=[['uuid','in', list(root_owners)]])
607 lgroups = arvados.util.list_all(
608 self.api.groups().list, self.num_retries,
609 filters=[['uuid','in', list(root_owners)]])
615 objects[l["uuid"]] = l
617 objects[l["uuid"]] = l
620 for r in root_owners:
624 contents[obr["name"]] = obr
625 if "first_name" in obr:
626 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
629 if r['owner_uuid'] not in objects:
630 contents[r['name']] = r
632 # end with llfuse.lock_released, re-acquire lock
635 self.merge(contents.items(),
637 lambda a, i: a.uuid == i[1]['uuid'],
638 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))