def flush_data(self):
start_buffer_len = self._data_buffer_len
- start_block_count = self.bytes_written / self.KEEP_BLOCK_SIZE
+ start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
super(ArvPutCollectionWriter, self).flush_data()
if self._data_buffer_len < start_buffer_len: # We actually PUT data.
self.bytes_written += (start_buffer_len - self._data_buffer_len)
self.report_progress()
- if (self.bytes_written / self.KEEP_BLOCK_SIZE) > start_block_count:
+ if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
self.cache_state()
def _record_new_input(self, input_type, source_name, dest_name):
print >>stderr, error
sys.exit(1)
- # Apply default replication, if none specified. TODO (#3410): Use
- # default replication given by discovery document.
- if args.replication <= 0:
- args.replication = 2
+ # write_copies diverges from args.replication here.
+ # args.replication is how many copies we will instruct Arvados to
+ # maintain (by passing it in collections().create()) after all
+ # data is written -- and if None was given, we'll use None there.
+ # Meanwhile, write_copies is how many copies of each data block we
+ # write to Keep, which has to be a number.
+ #
+ # If we simply changed args.replication from None to a default
+ # here, we'd end up erroneously passing the default replication
+ # level (instead of None) to collections().create().
+ write_copies = (args.replication or
+ api_client._rootDesc.get('defaultCollectionReplication', 2))
if args.progress:
reporter = progress_writer(human_progress)
writer = ArvPutCollectionWriter(
resume_cache, reporter, bytes_expected,
num_retries=args.retries,
- replication=args.replication)
+ replication=write_copies)
else:
writer = ArvPutCollectionWriter.from_cache(
resume_cache, reporter, bytes_expected,
num_retries=args.retries,
- replication=args.replication)
+ replication=write_copies)
# Install our signal handler for each code in CAUGHT_SIGNALS, and save
# the originals.
manifest_text = CollectionReader(manifest_text).manifest_text(normalize=True)
replication_attr = 'replication_desired'
if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
- # API calls it 'redundancy' until #3410.
+ # API called it 'redundancy' before #3410.
replication_attr = 'redundancy'
# Register the resulting collection in Arvados.
collection = api_client.collections().create(
# appear as underscores in the fuse mount.)
_disallowed_filename_characters = re.compile('[\x00/]')
-class SafeApi(object):
- """Threadsafe wrapper for API object.
-
- This stores and returns a different api object per thread, because
- httplib2 which underlies apiclient is not threadsafe.
- """
-
- def __init__(self, config):
- self.host = config.get('ARVADOS_API_HOST')
- self.api_token = config.get('ARVADOS_API_TOKEN')
- self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
- self.local = threading.local()
- self.block_cache = arvados.KeepBlockCache()
-
- def localapi(self):
- if 'api' not in self.local.__dict__:
- self.local.api = arvados.api(
- version='v1',
- host=self.host, token=self.api_token, insecure=self.insecure)
- return self.local.api
-
- def localkeep(self):
- if 'keep' not in self.local.__dict__:
- self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
- return self.local.keep
-
- def __getattr__(self, name):
- # Proxy nonexistent attributes to the local API client.
- try:
- return getattr(self.localapi(), name)
- except AttributeError:
- return super(SafeApi, self).__getattr__(name)
-
-
def convertTime(t):
- '''Parse Arvados timestamp to unix time.'''
+ """Parse Arvados timestamp to unix time."""
+ if not t:
+ return 0
try:
return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
except (TypeError, ValueError):
self.collection_object = None
if isinstance(collection, dict):
self.collection_locator = collection['uuid']
+ self._mtime = convertTime(collection.get('modified_at'))
else:
self.collection_locator = collection
+ self._mtime = 0
def same(self, i):
return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
def new_collection(self, new_collection_object, coll_reader):
self.collection_object = new_collection_object
+ self._mtime = convertTime(self.collection_object.get('modified_at'))
+
if self.collection_object_file is not None:
self.collection_object_file.update(self.collection_object)
else:
return super(CollectionDirectory, self).__contains__(k)
- def mtime(self):
- self.checkupdate()
- return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
-
class MagicDirectory(Directory):
'''A special directory that logically contains the set of all extant keep
contents = arvados.util.list_all(self.api.groups().contents,
self.num_retries, uuid=self.uuid)
- # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
- contents += arvados.util.list_all(
- self.api.links().list, self.num_retries,
- filters=[['tail_uuid', '=', self.uuid],
- ['link_class', '=', 'name']])
# end with llfuse.lock_released, re-acquire lock
# arv-mount.
# The workaround is to implement it with the proper number of parameters,
# and then everything works out.
- def create(self, p1, p2, p3, p4, p5):
+ def create(self, inode_parent, name, mode, flags, ctx):
raise llfuse.FUSEError(errno.EROFS)