total data size).
""")
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument('--resume', action='store_true', default=True,
+ help="""
+ Continue interrupted uploads from cached state (default).
+ """)
+ group.add_argument('--no-resume', action='store_false', dest='resume',
+ help="""
+ Do not continue interrupted uploads from cached state.
+ """)
+
args = parser.parse_args(arguments)
if len(args.paths) == 0:
class ResumeCache(object):
CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
+ @classmethod
+ def setup_user_cache(cls):
+ try:
+ os.makedirs(cls.CACHE_DIR)
+ except OSError as error:
+ if error.errno != errno.EEXIST:
+ raise
+ else:
+ os.chmod(cls.CACHE_DIR, 0o700)
+
def __init__(self, file_spec):
try:
self.cache_file = open(file_spec, 'a+')
raise
self.close()
+ def restart(self):
+ self.destroy()
+ self.__init__(self.filename)
-class ResumeCacheCollectionWriter(arvados.ResumableCollectionWriter):
- def __init__(self, cache=None):
+
+class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
+ def __init__(self, cache=None, reporter=None, bytes_expected=None):
+ self.__init_locals__(cache, reporter, bytes_expected)
+ super(ArvPutCollectionWriter, self).__init__()
+
+ def __init_locals__(self, cache, reporter, bytes_expected):
self.cache = cache
- super(ResumeCacheCollectionWriter, self).__init__()
+ self.report_func = reporter
+ self.bytes_written = 0
+ self.bytes_expected = bytes_expected
@classmethod
- def from_cache(cls, cache):
+ def from_cache(cls, cache, reporter=None, bytes_expected=None):
try:
state = cache.load()
state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
writer = cls.from_state(state)
except (TypeError, ValueError,
arvados.errors.StaleWriterStateError) as error:
- return cls(cache)
+ return cls(cache, reporter, bytes_expected)
else:
- writer.cache = cache
+ writer.__init_locals__(cache, reporter, bytes_expected)
return writer
def checkpoint_state(self):
state[attr] = list(value)
self.cache.save(state)
-
-class CollectionWriterWithProgress(arvados.CollectionWriter):
- def flush_data(self, *args, **kwargs):
- if not getattr(self, 'display_type', None):
- return
- if not hasattr(self, 'bytes_flushed'):
- self.bytes_flushed = 0
- self.bytes_flushed += self._data_buffer_len
- super(CollectionWriterWithProgress, self).flush_data(*args, **kwargs)
- self.bytes_flushed -= self._data_buffer_len
- if self.display_type == 'machine':
- sys.stderr.write('%s %d: %d written %d total\n' %
- (sys.argv[0],
- os.getpid(),
- self.bytes_flushed,
- getattr(self, 'bytes_expected', -1)))
- elif getattr(self, 'bytes_expected', 0) > 0:
- pct = 100.0 * self.bytes_flushed / self.bytes_expected
- sys.stderr.write('\r%dM / %dM %.1f%% ' %
- (self.bytes_flushed >> 20,
- self.bytes_expected >> 20, pct))
- else:
- sys.stderr.write('\r%d ' % self.bytes_flushed)
-
- def manifest_text(self, *args, **kwargs):
- manifest_text = (super(CollectionWriterWithProgress, self)
- .manifest_text(*args, **kwargs))
- if getattr(self, 'display_type', None):
- if self.display_type == 'human':
- sys.stderr.write('\n')
- self.display_type = None
- return manifest_text
+ def flush_data(self):
+ bytes_buffered = self._data_buffer_len
+ super(ArvPutCollectionWriter, self).flush_data()
+ self.bytes_written += (bytes_buffered - self._data_buffer_len)
+ if self.report_func is not None:
+ self.report_func(self.bytes_written, self.bytes_expected)
def expected_bytes_for(pathlist):
+ # Walk the given directory trees and stat files, adding up file sizes,
+ # so we can display progress as percent
bytesum = 0
for path in pathlist:
if os.path.isdir(path):
else:
return "\r{} ".format(bytes_written)
+def progress_writer(progress_func, outfile=sys.stderr):
+ def write_progress(bytes_written, bytes_expected):
+ outfile.write(progress_func(bytes_written, bytes_expected))
+ return write_progress
+
def main(arguments=None):
+ ResumeCache.setup_user_cache()
args = parse_arguments(arguments)
if args.progress:
- writer = CollectionWriterWithProgress()
- writer.display_type = 'human'
+ reporter = progress_writer(human_progress)
elif args.batch_progress:
- writer = CollectionWriterWithProgress()
- writer.display_type = 'machine'
+ reporter = progress_writer(machine_progress)
else:
- writer = arvados.CollectionWriter()
+ reporter = None
- # Walk the given directory trees and stat files, adding up file sizes,
- # so we can display progress as percent
- writer.bytes_expected = expected_bytes_for(args.paths)
- if writer.bytes_expected is None:
- del writer.bytes_expected
+ try:
+ resume_cache = ResumeCache(args)
+ if not args.resume:
+ resume_cache.restart()
+ except ResumeCacheConflict:
+ print "arv-put: Another process is already uploading this data."
+ sys.exit(1)
+
+ writer = ArvPutCollectionWriter.from_cache(
+ resume_cache, reporter, expected_bytes_for(args.paths))
# Copy file data to Keep.
for path in args.paths:
# Print the locator (uuid) of the new collection.
print writer.finish()
+ resume_cache.destroy()
if __name__ == '__main__':
main()