import threading
from apiclient import errors as apiclient_errors
import errno
+import time
-from fusefile import StringFile, ObjectFile, FuseArvadosFile
+from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
from fresh import FreshBase, convertTime, use_counter, check_update
import arvados.collection
job output.
"""
+ class UnsaveableCollection(arvados.collection.Collection):
+ def save(self):
+ pass
+ def save_new(self):
+ pass
+
def __init__(self, parent_inode, inodes, api_client, num_retries):
- collection = arvados.collection.Collection(
+ collection = self.UnsaveableCollection(
api_client=api_client,
keep_client=api_client.keep)
- collection.save = self._commit_collection
- collection.save_new = self._commit_collection
super(TmpCollectionDirectory, self).__init__(
parent_inode, inodes, collection)
self.collection_record_file = None
- self._subscribed = False
- self._update_collection_record()
-
- def update(self, *args, **kwargs):
- if not self._subscribed:
- with llfuse.lock_released:
- self.populate(self.mtime())
- self._subscribed = True
-
- @use_counter
- def _commit_collection(self):
- """Commit the data blocks, but don't save the collection to API.
+ self.populate(self.mtime())
- Update the content of the special .arvados#collection file, if
- it has been instantiated.
- """
- self.collection.flush()
- self._update_collection_record()
- if self.collection_record_file is not None:
- self.collection_record_file.update(self.collection_record)
+ def on_event(self, *args, **kwargs):
+ super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
+ if self.collection_record_file:
+ with llfuse.lock:
+ self.collection_record_file.invalidate()
self.inodes.invalidate_inode(self.collection_record_file.inode)
+ _logger.debug("%s invalidated collection record", self)
- def _update_collection_record(self):
- self.collection_record = {
- "uuid": None,
- "manifest_text": self.collection.manifest_text(),
- "portable_data_hash": self.collection.portable_data_hash(),
- }
+ def collection_record(self):
+ with llfuse.lock_released:
+ return {
+ "uuid": None,
+ "manifest_text": self.collection.manifest_text(),
+ "portable_data_hash": self.collection.portable_data_hash(),
+ }
def __contains__(self, k):
return (k == '.arvados#collection' or
def __getitem__(self, item):
if item == '.arvados#collection':
if self.collection_record_file is None:
- self.collection_record_file = ObjectFile(
+ self.collection_record_file = FuncToJSONFile(
self.inode, self.collection_record)
self.inodes.add_entry(self.collection_record_file)
return self.collection_record_file
return super(TmpCollectionDirectory, self).__getitem__(item)
+ def persisted(self):
+ return False
+
def writable(self):
return True
def finalize(self):
self.collection.stop_threads()
+ def invalidate(self):
+ if self.collection_record_file:
+ self.collection_record_file.invalidate()
+ super(TmpCollectionDirectory, self).invalidate()
+
class MagicDirectory(Directory):
"""A special directory that logically contains the set of all extant keep locators.
-import logging
-import re
import json
import llfuse
+import logging
+import re
+import time
from fresh import FreshBase, convertTime
def persisted(self):
return True
+
+
+class FuncToJSONFile(StringFile):
+ """File content is the return value of a given function, encoded as JSON.
+
+ The function is called at the time the file is read. The result is
+ cached until invalidate() is called.
+ """
+ def __init__(self, parent_inode, func):
+ super(FuncToJSONFile, self).__init__(parent_inode, "", 0)
+ self.func = func
+
+ def size(self):
+ self._update()
+ return super(FuncToJSONFile, self).size()
+
+ def readfrom(self, *args, **kwargs):
+ self._update()
+ return super(FuncToJSONFile, self).readfrom(*args, **kwargs)
+
+ def _update(self):
+ if not self.stale():
+ return
+ self._mtime = time.time()
+ obj = self.func()
+ self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
+ self.fresh()