+ @classmethod
+ def setup_user_cache(cls):
+ try:
+ os.makedirs(cls.CACHE_DIR)
+ except OSError as error:
+ if error.errno != errno.EEXIST:
+ raise
+ else:
+ os.chmod(cls.CACHE_DIR, 0o700)
+
+ def __init__(self, file_spec):
+ self.cache_file = open(file_spec, 'a+')
+ self._lock_file(self.cache_file)
+ self.filename = self.cache_file.name
+
+ @classmethod
+ def make_path(cls, args):
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ realpaths = sorted(os.path.realpath(path) for path in args.paths)
+ md5.update(''.join(realpaths))
+ if any(os.path.isdir(path) for path in realpaths):
+ md5.update(str(max(args.max_manifest_depth, -1)))
+ elif args.filename:
+ md5.update(args.filename)
+ return os.path.join(cls.CACHE_DIR, md5.hexdigest())
+
+ def _lock_file(self, fileobj):
+ try:
+ fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+ def load(self):
+ self.cache_file.seek(0)
+ return json.load(self.cache_file)
+
+ def save(self, data):
+ try:
+ new_cache_fd, new_cache_name = tempfile.mkstemp(
+ dir=os.path.dirname(self.filename))
+ self._lock_file(new_cache_fd)
+ new_cache = os.fdopen(new_cache_fd, 'r+')
+ json.dump(data, new_cache)
+ os.rename(new_cache_name, self.filename)
+ except (IOError, OSError, ResumeCacheConflict) as error:
+ try:
+ os.unlink(new_cache_name)
+ except NameError: # mkstemp failed.
+ pass
+ else:
+ self.cache_file.close()
+ self.cache_file = new_cache
+
+ def close(self):
+ self.cache_file.close()
+
+ def destroy(self):
+ try:
+ os.unlink(self.filename)
+ except OSError as error:
+ if error.errno != errno.ENOENT: # That's what we wanted anyway.
+ raise
+ self.close()
+
+ def restart(self):
+ self.destroy()
+ self.__init__(self.filename)
+
+
+class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
+ STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
+ ['bytes_written', '_seen_inputs'])
+
+ def __init__(self, cache=None, reporter=None, bytes_expected=None):
+ self.bytes_written = 0
+ self._seen_inputs = []
+ self.cache = cache
+ if reporter is None:
+ self.report_progress = lambda bytes_w, bytes_e: None
+ else:
+ self.report_progress = reporter
+ self.bytes_expected = bytes_expected
+ super(ArvPutCollectionWriter, self).__init__()
+
+ @classmethod
+ def from_cache(cls, cache, reporter=None, bytes_expected=None):
+ try:
+ state = cache.load()
+ state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
+ writer = cls.from_state(state, cache, reporter, bytes_expected)
+ except (TypeError, ValueError,
+ arvados.errors.StaleWriterStateError) as error:
+ return cls(cache, reporter, bytes_expected)
+ else:
+ return writer